You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:55 UTC
[03/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 294731a..4f7cde0 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.service;
import java.util.*;
-import javax.xml.crypto.Data;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.junit.Assert;
@@ -36,13 +34,13 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.streaming.PreviewKind;
@@ -107,13 +105,13 @@ public class ActiveRepairServiceTest
public void testGetNeighborsPlusOne() throws Throwable
{
// generate rf+1 nodes, and ensure that all nodes are returned
- Set<InetAddressAndPort> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ Set<InetAddressAndPort> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
expected.remove(FBUtilities.getBroadcastAddressAndPort());
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
Set<InetAddressAndPort> neighbors = new HashSet<>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null).endpoints());
}
assertEquals(expected, neighbors);
}
@@ -124,19 +122,19 @@ public class ActiveRepairServiceTest
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
// generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
- addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
Set<InetAddressAndPort> expected = new HashSet<>();
- for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort()))
+ for (Replica replica : ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort()))
{
- expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replica.range()).endpoints());
}
expected.remove(FBUtilities.getBroadcastAddressAndPort());
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
Set<InetAddressAndPort> neighbors = new HashSet<>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null).endpoints());
}
assertEquals(expected, neighbors);
}
@@ -147,18 +145,18 @@ public class ActiveRepairServiceTest
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
// generate rf+1 nodes, and ensure that all nodes are returned
- Set<InetAddressAndPort> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ Set<InetAddressAndPort> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
expected.remove(FBUtilities.getBroadcastAddressAndPort());
// remove remote endpoints
TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
HashSet<InetAddressAndPort> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
expected = Sets.intersection(expected, localEndpoints);
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
Set<InetAddressAndPort> neighbors = new HashSet<>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null).endpoints());
}
assertEquals(expected, neighbors);
}
@@ -169,12 +167,12 @@ public class ActiveRepairServiceTest
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
// generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
- addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
Set<InetAddressAndPort> expected = new HashSet<>();
- for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort()))
+ for (Replica replica : ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort()))
{
- expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replica.range()).endpoints());
}
expected.remove(FBUtilities.getBroadcastAddressAndPort());
// remove remote endpoints
@@ -182,11 +180,11 @@ public class ActiveRepairServiceTest
HashSet<InetAddressAndPort> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
expected = Sets.intersection(expected, localEndpoints);
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
Set<InetAddressAndPort> neighbors = new HashSet<>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null).endpoints());
}
assertEquals(expected, neighbors);
}
@@ -197,30 +195,30 @@ public class ActiveRepairServiceTest
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
// generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned
- addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
List<InetAddressAndPort> expected = new ArrayList<>();
- for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort()))
+ for (Replica replicas : ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort()))
{
- expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicas.range()).endpoints());
}
expected.remove(FBUtilities.getBroadcastAddressAndPort());
Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddressAndPort().toString(),expected.get(0).toString());
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, ranges,
ranges.iterator().next(),
- null, hosts).iterator().next());
+ null, hosts).endpoints().iterator().next());
}
@Test(expected = IllegalArgumentException.class)
public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable
{
- addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+ addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
//Dont give local endpoint
Collection<String> hosts = Arrays.asList("127.0.0.3");
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
+ Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), null, hosts);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
new file mode 100644
index 0000000..63973ea
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.service.StorageServiceTest.assertMultimapEqualsIgnoreOrder;
+
+/**
+ * This is also fairly effectively testing source retrieval for bootstrap as well since RangeStreamer
+ * is used to calculate the endpoints to fetch from and check they are alive for both RangeRelocator (move) and
+ * bootstrap (RangeRelocator).
+ */
+public class BootstrapTransientTest
+{
+ static InetAddressAndPort aAddress;
+ static InetAddressAndPort bAddress;
+ static InetAddressAndPort cAddress;
+ static InetAddressAndPort dAddress;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception
+ {
+ aAddress = InetAddressAndPort.getByName("127.0.0.1");
+ bAddress = InetAddressAndPort.getByName("127.0.0.2");
+ cAddress = InetAddressAndPort.getByName("127.0.0.3");
+ dAddress = InetAddressAndPort.getByName("127.0.0.4");
+ }
+
+ private final List<InetAddressAndPort> downNodes = new ArrayList<>();
+ Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint());
+
+ private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>();
+ private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint()));
+
+ @After
+ public void clearDownNode()
+ {
+ downNodes.clear();
+ sourceFilterDownNodes.clear();
+ }
+
+ @BeforeClass
+ public static void setupDD()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ }
+
+ Token tenToken = new OrderPreservingPartitioner.StringToken("00010");
+ Token twentyToken = new OrderPreservingPartitioner.StringToken("00020");
+ Token thirtyToken = new OrderPreservingPartitioner.StringToken("00030");
+ Token fourtyToken = new OrderPreservingPartitioner.StringToken("00040");
+
+ Range<Token> aRange = new Range<>(thirtyToken, tenToken);
+ Range<Token> bRange = new Range<>(tenToken, twentyToken);
+ Range<Token> cRange = new Range<>(twentyToken, thirtyToken);
+ Range<Token> dRange = new Range<>(thirtyToken, fourtyToken);
+
+ RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(dAddress, dRange, true),
+ new Replica(dAddress, cRange, true),
+ new Replica(dAddress, bRange, false));
+
+ @Test
+ public void testRangeStreamerRangesToFetch() throws Exception
+ {
+ EndpointsByReplica expectedResult = new EndpointsByReplica(ImmutableMap.of(
+ fullReplica(dAddress, dRange), EndpointsForRange.builder(aRange).add(fullReplica(bAddress, aRange)).add(transientReplica(cAddress, aRange)).build(),
+ fullReplica(dAddress, cRange), EndpointsForRange.builder(cRange).add(fullReplica(cAddress, cRange)).add(transientReplica(bAddress, cRange)).build(),
+ transientReplica(dAddress, bRange), EndpointsForRange.builder(bRange).add(transientReplica(aAddress, bRange)).build()));
+
+ invokeCalculateRangesToFetchWithPreferredEndpoints(toFetch, constructTMDs(), expectedResult);
+ }
+
+ private Pair<TokenMetadata, TokenMetadata> constructTMDs()
+ {
+ TokenMetadata tmd = new TokenMetadata();
+ tmd.updateNormalToken(aRange.right, aAddress);
+ tmd.updateNormalToken(bRange.right, bAddress);
+ tmd.updateNormalToken(cRange.right, cAddress);
+ TokenMetadata updated = tmd.cloneOnlyTokenMap();
+ updated.updateNormalToken(dRange.right, dAddress);
+
+ return Pair.create(tmd, updated);
+ }
+
+ private void invokeCalculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> toFetch,
+ Pair<TokenMetadata, TokenMetadata> tmds,
+ EndpointsByReplica expectedResult)
+ {
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+
+ EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas,
+ simpleStrategy(tmds.left),
+ toFetch,
+ true,
+ tmds.left,
+ tmds.right,
+ alivePredicate,
+ "OldNetworkTopologyStrategyTest",
+ sourceFilters);
+ result.asMap().forEach((replica, list) -> System.out.printf("Replica %s, sources %s%n", replica, list));
+ assertMultimapEqualsIgnoreOrder(expectedResult, result);
+
+ }
+
+ private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
+ {
+ IEndpointSnitch snitch = new AbstractEndpointSnitch()
+ {
+ public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2)
+ {
+ return 0;
+ }
+
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ return "R1";
+ }
+
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ return "DC1";
+ }
+ };
+
+ return new SimpleStrategy("MoveTransientTest",
+ tmd,
+ snitch,
+ com.google.common.collect.ImmutableMap.of("replication_factor", "3/1"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index 8ddc4f0..3c4748e 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -130,10 +130,10 @@ public class LeaveAndBootstrapTest
strategy = getStrategy(keyspaceName, tmd);
for (Token token : keyTokens)
{
- int replicationFactor = strategy.getReplicationFactor();
+ int replicationFactor = strategy.getReplicationFactor().allReplicas;
- HashSet<InetAddressAndPort> actual = new HashSet<>(tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap())));
- HashSet<InetAddressAndPort> expected = new HashSet<>();
+ Set<InetAddressAndPort> actual = tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalReplicas(token, tmd.cloneOnlyTokenMap()).forToken(token)).endpoints();
+ Set<InetAddressAndPort> expected = new HashSet<>();
for (int i = 0; i < replicationFactor; i++)
{
@@ -198,8 +198,6 @@ public class LeaveAndBootstrapTest
ApplicationState.STATUS,
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7))));
- Collection<InetAddressAndPort> endpoints = null;
-
/* don't require test update every time a new keyspace is added to test/conf/cassandra.yaml */
Map<String, AbstractReplicationStrategy> keyspaceStrategyMap = new HashMap<String, AbstractReplicationStrategy>();
for (int i=1; i<=4; i++)
@@ -263,18 +261,18 @@ public class LeaveAndBootstrapTest
for (int i = 0; i < keyTokens.size(); i++)
{
- endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
+ Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints();
assertEquals(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).size(), endpoints.size());
assertTrue(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).containsAll(endpoints));
}
// just to be sure that things still work according to the old tests, run them:
- if (strategy.getReplicationFactor() != 3)
+ if (strategy.getReplicationFactor().allReplicas != 3)
continue;
// tokens 5, 15 and 25 should go three nodes
for (int i=0; i<3; ++i)
{
- endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
+ Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints();
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(i+1)));
assertTrue(endpoints.contains(hosts.get(i+2)));
@@ -282,7 +280,7 @@ public class LeaveAndBootstrapTest
}
// token 35 should go to nodes 4, 5, 6, 7 and boot1
- endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(3)));
+ Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(3))).endpoints();
assertEquals(5, endpoints.size());
assertTrue(endpoints.contains(hosts.get(4)));
assertTrue(endpoints.contains(hosts.get(5)));
@@ -291,7 +289,7 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(boot1));
// token 45 should go to nodes 5, 6, 7, 0, boot1 and boot2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(4)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(4))).endpoints();
assertEquals(6, endpoints.size());
assertTrue(endpoints.contains(hosts.get(5)));
assertTrue(endpoints.contains(hosts.get(6)));
@@ -301,7 +299,7 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(boot2));
// token 55 should go to nodes 6, 7, 8, 0, 1, boot1 and boot2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(5)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(5))).endpoints();
assertEquals(7, endpoints.size());
assertTrue(endpoints.contains(hosts.get(6)));
assertTrue(endpoints.contains(hosts.get(7)));
@@ -312,7 +310,7 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(boot2));
// token 65 should go to nodes 7, 8, 9, 0, 1 and boot2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(6)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(6))).endpoints();
assertEquals(6, endpoints.size());
assertTrue(endpoints.contains(hosts.get(7)));
assertTrue(endpoints.contains(hosts.get(8)));
@@ -322,7 +320,7 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(boot2));
// token 75 should to go nodes 8, 9, 0, 1, 2 and boot2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(7)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(7))).endpoints();
assertEquals(6, endpoints.size());
assertTrue(endpoints.contains(hosts.get(8)));
assertTrue(endpoints.contains(hosts.get(9)));
@@ -332,7 +330,7 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(boot2));
// token 85 should go to nodes 9, 0, 1 and 2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(8)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(8))).endpoints();
assertEquals(4, endpoints.size());
assertTrue(endpoints.contains(hosts.get(9)));
assertTrue(endpoints.contains(hosts.get(0)));
@@ -340,7 +338,7 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(hosts.get(2)));
// token 95 should go to nodes 0, 1 and 2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(9)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(9))).endpoints();
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
@@ -385,18 +383,18 @@ public class LeaveAndBootstrapTest
for (int i = 0; i < keyTokens.size(); i++)
{
- endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
+ Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints();
assertEquals(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).size(), endpoints.size());
assertTrue(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).containsAll(endpoints));
}
- if (strategy.getReplicationFactor() != 3)
+ if (strategy.getReplicationFactor().allReplicas != 3)
continue;
// leave this stuff in to guarantee the old tests work the way they were supposed to.
// tokens 5, 15 and 25 should go three nodes
for (int i=0; i<3; ++i)
{
- endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
+ Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints();
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(i+1)));
assertTrue(endpoints.contains(hosts.get(i+2)));
@@ -404,21 +402,21 @@ public class LeaveAndBootstrapTest
}
// token 35 goes to nodes 4, 5 and boot1
- endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(3)));
+ Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(3))).endpoints();
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(4)));
assertTrue(endpoints.contains(hosts.get(5)));
assertTrue(endpoints.contains(boot1));
// token 45 goes to nodes 5, boot1 and node7
- endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(4)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(4))).endpoints();
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(5)));
assertTrue(endpoints.contains(boot1));
assertTrue(endpoints.contains(hosts.get(7)));
// token 55 goes to boot1, 7, boot2, 8 and 0
- endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(5)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(5))).endpoints();
assertEquals(5, endpoints.size());
assertTrue(endpoints.contains(boot1));
assertTrue(endpoints.contains(hosts.get(7)));
@@ -427,7 +425,7 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(hosts.get(0)));
// token 65 goes to nodes 7, boot2, 8, 0 and 1
- endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(6)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(6))).endpoints();
assertEquals(5, endpoints.size());
assertTrue(endpoints.contains(hosts.get(7)));
assertTrue(endpoints.contains(boot2));
@@ -436,7 +434,7 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(hosts.get(1)));
// token 75 goes to nodes boot2, 8, 0, 1 and 2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(7)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(7))).endpoints();
assertEquals(5, endpoints.size());
assertTrue(endpoints.contains(boot2));
assertTrue(endpoints.contains(hosts.get(8)));
@@ -445,14 +443,14 @@ public class LeaveAndBootstrapTest
assertTrue(endpoints.contains(hosts.get(2)));
// token 85 goes to nodes 0, 1 and 2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(8)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(8))).endpoints();
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(hosts.get(2)));
// token 95 goes to nodes 0, 1 and 2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(9)));
+ endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(9))).endpoints();
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index 7321fba..731a25d 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -23,13 +23,21 @@ import java.net.UnknownHostException;
import java.util.*;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.schema.MigrationManager;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -492,24 +500,25 @@ public class MoveTest
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host);
}
- private Map.Entry<Range<Token>, Collection<InetAddressAndPort>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
+ private Map.Entry<Range<Token>, EndpointsForRange> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
{
- Map<Range<Token>, Collection<InetAddressAndPort>> pendingRanges = new HashMap<>();
- pendingRanges.put(generateRange(start, end), makeAddrs(endpoints));
+ Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>();
+ Range<Token> range = generateRange(start, end);
+ pendingRanges.put(range, makeReplicas(range, endpoints));
return pendingRanges.entrySet().iterator().next();
}
- private Map<Range<Token>, Collection<InetAddressAndPort>> generatePendingRanges(Map.Entry<Range<Token>, Collection<InetAddressAndPort>>... entries)
+ private Map<Range<Token>, EndpointsForRange> generatePendingRanges(Map.Entry<Range<Token>, EndpointsForRange>... entries)
{
- Map<Range<Token>, Collection<InetAddressAndPort>> pendingRanges = new HashMap<>();
- for(Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : entries)
+ Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>();
+ for(Map.Entry<Range<Token>, EndpointsForRange> entry : entries)
{
pendingRanges.put(entry.getKey(), entry.getValue());
}
return pendingRanges;
}
- private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, Collection<InetAddressAndPort>> pendingRanges, String keyspaceName) throws ConfigurationException
+ private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, EndpointsForRange> pendingRanges, String keyspaceName) throws ConfigurationException
{
boolean keyspaceFound = false;
for (String nonSystemKeyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
@@ -523,15 +532,15 @@ public class MoveTest
assert keyspaceFound;
}
- private void assertMaps(Map<Range<Token>, Collection<InetAddressAndPort>> expected, PendingRangeMaps actual)
+ private void assertMaps(Map<Range<Token>, EndpointsForRange> expected, PendingRangeMaps actual)
{
int sizeOfActual = 0;
- Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator = actual.iterator();
+ Iterator<Map.Entry<Range<Token>, EndpointsForRange.Mutable>> iterator = actual.iterator();
while(iterator.hasNext())
{
- Map.Entry<Range<Token>, List<InetAddressAndPort>> actualEntry = iterator.next();
+ Map.Entry<Range<Token>, EndpointsForRange.Mutable> actualEntry = iterator.next();
assertNotNull(expected.get(actualEntry.getKey()));
- assertEquals(new HashSet<>(expected.get(actualEntry.getKey())), new HashSet<>(actualEntry.getValue()));
+ assertEquals(ImmutableSet.copyOf(expected.get(actualEntry.getKey())), ImmutableSet.copyOf(actualEntry.getValue()));
sizeOfActual++;
}
@@ -589,9 +598,9 @@ public class MoveTest
int numMoved = 0;
for (Token token : keyTokens)
{
- int replicationFactor = strategy.getReplicationFactor();
+ int replicationFactor = strategy.getReplicationFactor().allReplicas;
- HashSet<InetAddressAndPort> actual = new HashSet<>(tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap())));
+ EndpointsForToken actual = tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalReplicas(token, tmd.cloneOnlyTokenMap()).forToken(token));
HashSet<InetAddressAndPort> expected = new HashSet<>();
for (int i = 0; i < replicationFactor; i++)
@@ -600,10 +609,10 @@ public class MoveTest
}
if (expected.size() == actual.size()) {
- assertEquals("mismatched endpoint sets", expected, actual);
+ assertEquals("mismatched endpoint sets", expected, actual.endpoints());
} else {
expected.add(hosts.get(MOVING_NODE));
- assertEquals("mismatched endpoint sets", expected, actual);
+ assertEquals("mismatched endpoint sets", expected, actual.endpoints());
numMoved++;
}
}
@@ -648,8 +657,6 @@ public class MoveTest
newTokens.put(movingIndex, newToken);
}
- Collection<InetAddressAndPort> endpoints;
-
tmd = tmd.cloneAfterAllSettled();
ss.setTokenMetadataUnsafe(tmd);
@@ -693,37 +700,18 @@ public class MoveTest
* }
*/
- Multimap<InetAddressAndPort, Range<Token>> keyspace1ranges = keyspaceStrategyMap.get(Simple_RF1_KeyspaceName).getAddressRanges();
- Collection<Range<Token>> ranges1 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.1"));
- assertEquals(1, collectionSize(ranges1));
- assertEquals(generateRange(97, 0), ranges1.iterator().next());
- Collection<Range<Token>> ranges2 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.2"));
- assertEquals(1, collectionSize(ranges2));
- assertEquals(generateRange(0, 10), ranges2.iterator().next());
- Collection<Range<Token>> ranges3 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.3"));
- assertEquals(1, collectionSize(ranges3));
- assertEquals(generateRange(10, 20), ranges3.iterator().next());
- Collection<Range<Token>> ranges4 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.4"));
- assertEquals(1, collectionSize(ranges4));
- assertEquals(generateRange(20, 30), ranges4.iterator().next());
- Collection<Range<Token>> ranges5 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.5"));
- assertEquals(1, collectionSize(ranges5));
- assertEquals(generateRange(30, 40), ranges5.iterator().next());
- Collection<Range<Token>> ranges6 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.6"));
- assertEquals(1, collectionSize(ranges6));
- assertEquals(generateRange(40, 50), ranges6.iterator().next());
- Collection<Range<Token>> ranges7 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.7"));
- assertEquals(1, collectionSize(ranges7));
- assertEquals(generateRange(50, 67), ranges7.iterator().next());
- Collection<Range<Token>> ranges8 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.8"));
- assertEquals(1, collectionSize(ranges8));
- assertEquals(generateRange(67, 70), ranges8.iterator().next());
- Collection<Range<Token>> ranges9 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.9"));
- assertEquals(1, collectionSize(ranges9));
- assertEquals(generateRange(70, 87), ranges9.iterator().next());
- Collection<Range<Token>> ranges10 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.10"));
- assertEquals(1, collectionSize(ranges10));
- assertEquals(generateRange(87, 97), ranges10.iterator().next());
+ RangesByEndpoint keyspace1ranges = keyspaceStrategyMap.get(Simple_RF1_KeyspaceName).getAddressReplicas();
+
+ assertRanges(keyspace1ranges, "127.0.0.1", 97, 0);
+ assertRanges(keyspace1ranges, "127.0.0.2", 0, 10);
+ assertRanges(keyspace1ranges, "127.0.0.3", 10, 20);
+ assertRanges(keyspace1ranges, "127.0.0.4", 20, 30);
+ assertRanges(keyspace1ranges, "127.0.0.5", 30, 40);
+ assertRanges(keyspace1ranges, "127.0.0.6", 40, 50);
+ assertRanges(keyspace1ranges, "127.0.0.7", 50, 67);
+ assertRanges(keyspace1ranges, "127.0.0.8", 67, 70);
+ assertRanges(keyspace1ranges, "127.0.0.9", 70, 87);
+ assertRanges(keyspace1ranges, "127.0.0.10", 87, 97);
/**
@@ -742,37 +730,17 @@ public class MoveTest
* }
*/
- Multimap<InetAddressAndPort, Range<Token>> keyspace3ranges = keyspaceStrategyMap.get(KEYSPACE3).getAddressRanges();
- ranges1 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.1"));
- assertEquals(collectionSize(ranges1), 5);
- assertTrue(ranges1.equals(generateRanges(97, 0, 70, 87, 50, 67, 87, 97, 67, 70)));
- ranges2 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.2"));
- assertEquals(collectionSize(ranges2), 5);
- assertTrue(ranges2.equals(generateRanges(97, 0, 70, 87, 87, 97, 0, 10, 67, 70)));
- ranges3 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.3"));
- assertEquals(collectionSize(ranges3), 5);
- assertTrue(ranges3.equals(generateRanges(97, 0, 70, 87, 87, 97, 0, 10, 10, 20)));
- ranges4 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.4"));
- assertEquals(collectionSize(ranges4), 5);
- assertTrue(ranges4.equals(generateRanges(97, 0, 20, 30, 87, 97, 0, 10, 10, 20)));
- ranges5 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.5"));
- assertEquals(collectionSize(ranges5), 5);
- assertTrue(ranges5.equals(generateRanges(97, 0, 30, 40, 20, 30, 0, 10, 10, 20)));
- ranges6 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.6"));
- assertEquals(collectionSize(ranges6), 5);
- assertTrue(ranges6.equals(generateRanges(40, 50, 30, 40, 20, 30, 0, 10, 10, 20)));
- ranges7 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.7"));
- assertEquals(collectionSize(ranges7), 5);
- assertTrue(ranges7.equals(generateRanges(40, 50, 30, 40, 50, 67, 20, 30, 10, 20)));
- ranges8 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.8"));
- assertEquals(collectionSize(ranges8), 5);
- assertTrue(ranges8.equals(generateRanges(40, 50, 30, 40, 50, 67, 20, 30, 67, 70)));
- ranges9 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.9"));
- assertEquals(collectionSize(ranges9), 5);
- assertTrue(ranges9.equals(generateRanges(40, 50, 70, 87, 30, 40, 50, 67, 67, 70)));
- ranges10 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.10"));
- assertEquals(collectionSize(ranges10), 5);
- assertTrue(ranges10.equals(generateRanges(40, 50, 70, 87, 50, 67, 87, 97, 67, 70)));
+ RangesByEndpoint keyspace3ranges = keyspaceStrategyMap.get(KEYSPACE3).getAddressReplicas();
+ assertRanges(keyspace3ranges, "127.0.0.1", 97, 0, 70, 87, 50, 67, 87, 97, 67, 70);
+ assertRanges(keyspace3ranges, "127.0.0.2", 97, 0, 70, 87, 87, 97, 0, 10, 67, 70);
+ assertRanges(keyspace3ranges, "127.0.0.3", 97, 0, 70, 87, 87, 97, 0, 10, 10, 20);
+ assertRanges(keyspace3ranges, "127.0.0.4", 97, 0, 20, 30, 87, 97, 0, 10, 10, 20);
+ assertRanges(keyspace3ranges, "127.0.0.5", 97, 0, 30, 40, 20, 30, 0, 10, 10, 20);
+ assertRanges(keyspace3ranges, "127.0.0.6", 40, 50, 30, 40, 20, 30, 0, 10, 10, 20);
+ assertRanges(keyspace3ranges, "127.0.0.7", 40, 50, 30, 40, 50, 67, 20, 30, 10, 20);
+ assertRanges(keyspace3ranges, "127.0.0.8", 40, 50, 30, 40, 50, 67, 20, 30, 67, 70);
+ assertRanges(keyspace3ranges, "127.0.0.9", 40, 50, 70, 87, 30, 40, 50, 67, 67, 70);
+ assertRanges(keyspace3ranges, "127.0.0.10", 40, 50, 70, 87, 50, 67, 87, 97, 67, 70);
/**
@@ -790,37 +758,18 @@ public class MoveTest
* /127.0.0.10=[(70,87], (87,97], (67,70]]
* }
*/
- Multimap<InetAddressAndPort, Range<Token>> keyspace4ranges = keyspaceStrategyMap.get(Simple_RF3_KeyspaceName).getAddressRanges();
- ranges1 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.1"));
- assertEquals(collectionSize(ranges1), 3);
- assertTrue(ranges1.equals(generateRanges(97, 0, 70, 87, 87, 97)));
- ranges2 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.2"));
- assertEquals(collectionSize(ranges2), 3);
- assertTrue(ranges2.equals(generateRanges(97, 0, 87, 97, 0, 10)));
- ranges3 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.3"));
- assertEquals(collectionSize(ranges3), 3);
- assertTrue(ranges3.equals(generateRanges(97, 0, 0, 10, 10, 20)));
- ranges4 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.4"));
- assertEquals(collectionSize(ranges4), 3);
- assertTrue(ranges4.equals(generateRanges(20, 30, 0, 10, 10, 20)));
- ranges5 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.5"));
- assertEquals(collectionSize(ranges5), 3);
- assertTrue(ranges5.equals(generateRanges(30, 40, 20, 30, 10, 20)));
- ranges6 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.6"));
- assertEquals(collectionSize(ranges6), 3);
- assertTrue(ranges6.equals(generateRanges(40, 50, 30, 40, 20, 30)));
- ranges7 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.7"));
- assertEquals(collectionSize(ranges7), 3);
- assertTrue(ranges7.equals(generateRanges(40, 50, 30, 40, 50, 67)));
- ranges8 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.8"));
- assertEquals(collectionSize(ranges8), 3);
- assertTrue(ranges8.equals(generateRanges(40, 50, 50, 67, 67, 70)));
- ranges9 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.9"));
- assertEquals(collectionSize(ranges9), 3);
- assertTrue(ranges9.equals(generateRanges(70, 87, 50, 67, 67, 70)));
- ranges10 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.10"));
- assertEquals(collectionSize(ranges10), 3);
- assertTrue(ranges10.equals(generateRanges(70, 87, 87, 97, 67, 70)));
+ RangesByEndpoint keyspace4ranges = keyspaceStrategyMap.get(Simple_RF3_KeyspaceName).getAddressReplicas();
+
+ assertRanges(keyspace4ranges, "127.0.0.1", 97, 0, 70, 87, 87, 97);
+ assertRanges(keyspace4ranges, "127.0.0.2", 97, 0, 87, 97, 0, 10);
+ assertRanges(keyspace4ranges, "127.0.0.3", 97, 0, 0, 10, 10, 20);
+ assertRanges(keyspace4ranges, "127.0.0.4", 20, 30, 0, 10, 10, 20);
+ assertRanges(keyspace4ranges, "127.0.0.5", 30, 40, 20, 30, 10, 20);
+ assertRanges(keyspace4ranges, "127.0.0.6", 40, 50, 30, 40, 20, 30);
+ assertRanges(keyspace4ranges, "127.0.0.7", 40, 50, 30, 40, 50, 67);
+ assertRanges(keyspace4ranges, "127.0.0.8", 40, 50, 50, 67, 67, 70);
+ assertRanges(keyspace4ranges, "127.0.0.9", 70, 87, 50, 67, 67, 70);
+ assertRanges(keyspace4ranges, "127.0.0.10", 70, 87, 87, 97, 67, 70);
// pre-calculate the results.
Map<String, Multimap<Token, InetAddressAndPort>> expectedEndpoints = new HashMap<>();
@@ -876,79 +825,80 @@ public class MoveTest
for (Token token : keyTokens)
{
- endpoints = tmd.getWriteEndpoints(token, keyspaceName, strategy.getNaturalEndpoints(token));
+ Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(token, keyspaceName, strategy.getNaturalReplicasForToken(token)).endpoints();
assertEquals(expectedEndpoints.get(keyspaceName).get(token).size(), endpoints.size());
assertTrue(expectedEndpoints.get(keyspaceName).get(token).containsAll(endpoints));
}
// just to be sure that things still work according to the old tests, run them:
- if (strategy.getReplicationFactor() != 3)
+ if (strategy.getReplicationFactor().allReplicas != 3)
continue;
+ ReplicaCollection<?> replicas = null;
// tokens 5, 15 and 25 should go three nodes
for (int i = 0; i < 3; i++)
{
- endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
- assertEquals(3, endpoints.size());
- assertTrue(endpoints.contains(hosts.get(i+1)));
- assertTrue(endpoints.contains(hosts.get(i+2)));
- assertTrue(endpoints.contains(hosts.get(i+3)));
+ replicas = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i)));
+ assertEquals(3, replicas.size());
+ assertTrue(replicas.endpoints().contains(hosts.get(i + 1)));
+ assertTrue(replicas.endpoints().contains(hosts.get(i + 2)));
+ assertTrue(replicas.endpoints().contains(hosts.get(i + 3)));
}
// token 35 should go to nodes 4, 5, 6 and boot1
- endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(3)));
- assertEquals(4, endpoints.size());
- assertTrue(endpoints.contains(hosts.get(4)));
- assertTrue(endpoints.contains(hosts.get(5)));
- assertTrue(endpoints.contains(hosts.get(6)));
- assertTrue(endpoints.contains(boot1));
+ replicas = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(3)));
+ assertEquals(4, replicas.size());
+ assertTrue(replicas.endpoints().contains(hosts.get(4)));
+ assertTrue(replicas.endpoints().contains(hosts.get(5)));
+ assertTrue(replicas.endpoints().contains(hosts.get(6)));
+ assertTrue(replicas.endpoints().contains(boot1));
// token 45 should go to nodes 5, 6, 7 boot1
- endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(4)));
- assertEquals(4, endpoints.size());
- assertTrue(endpoints.contains(hosts.get(5)));
- assertTrue(endpoints.contains(hosts.get(6)));
- assertTrue(endpoints.contains(hosts.get(7)));
- assertTrue(endpoints.contains(boot1));
+ replicas = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(4)));
+ assertEquals(4, replicas.size());
+ assertTrue(replicas.endpoints().contains(hosts.get(5)));
+ assertTrue(replicas.endpoints().contains(hosts.get(6)));
+ assertTrue(replicas.endpoints().contains(hosts.get(7)));
+ assertTrue(replicas.endpoints().contains(boot1));
// token 55 should go to nodes 6, 7, 8 boot1 and boot2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(5)));
- assertEquals(5, endpoints.size());
- assertTrue(endpoints.contains(hosts.get(6)));
- assertTrue(endpoints.contains(hosts.get(7)));
- assertTrue(endpoints.contains(hosts.get(8)));
- assertTrue(endpoints.contains(boot1));
- assertTrue(endpoints.contains(boot2));
+ replicas = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(5)));
+ assertEquals(5, replicas.size());
+ assertTrue(replicas.endpoints().contains(hosts.get(6)));
+ assertTrue(replicas.endpoints().contains(hosts.get(7)));
+ assertTrue(replicas.endpoints().contains(hosts.get(8)));
+ assertTrue(replicas.endpoints().contains(boot1));
+ assertTrue(replicas.endpoints().contains(boot2));
// token 65 should go to nodes 6, 7, 8 and boot2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(6)));
- assertEquals(4, endpoints.size());
- assertTrue(endpoints.contains(hosts.get(6)));
- assertTrue(endpoints.contains(hosts.get(7)));
- assertTrue(endpoints.contains(hosts.get(8)));
- assertTrue(endpoints.contains(boot2));
+ replicas = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(6)));
+ assertEquals(4, replicas.size());
+ assertTrue(replicas.endpoints().contains(hosts.get(6)));
+ assertTrue(replicas.endpoints().contains(hosts.get(7)));
+ assertTrue(replicas.endpoints().contains(hosts.get(8)));
+ assertTrue(replicas.endpoints().contains(boot2));
// token 75 should to go nodes 8, 9, 0 and boot2
- endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(7)));
- assertEquals(4, endpoints.size());
- assertTrue(endpoints.contains(hosts.get(8)));
- assertTrue(endpoints.contains(hosts.get(9)));
- assertTrue(endpoints.contains(hosts.get(0)));
- assertTrue(endpoints.contains(boot2));
+ replicas = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(7)));
+ assertEquals(4, replicas.size());
+ assertTrue(replicas.endpoints().contains(hosts.get(8)));
+ assertTrue(replicas.endpoints().contains(hosts.get(9)));
+ assertTrue(replicas.endpoints().contains(hosts.get(0)));
+ assertTrue(replicas.endpoints().contains(boot2));
// token 85 should go to nodes 8, 9 and 0
- endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(8)));
- assertEquals(3, endpoints.size());
- assertTrue(endpoints.contains(hosts.get(8)));
- assertTrue(endpoints.contains(hosts.get(9)));
- assertTrue(endpoints.contains(hosts.get(0)));
+ replicas = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(8)));
+ assertEquals(3, replicas.size());
+ assertTrue(replicas.endpoints().contains(hosts.get(8)));
+ assertTrue(replicas.endpoints().contains(hosts.get(9)));
+ assertTrue(replicas.endpoints().contains(hosts.get(0)));
// token 95 should go to nodes 9, 0 and 1
- endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(9)));
- assertEquals(3, endpoints.size());
- assertTrue(endpoints.contains(hosts.get(9)));
- assertTrue(endpoints.contains(hosts.get(0)));
- assertTrue(endpoints.contains(hosts.get(1)));
+ replicas = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(9)));
+ assertEquals(3, replicas.size());
+ assertTrue(replicas.endpoints().contains(hosts.get(9)));
+ assertTrue(replicas.endpoints().contains(hosts.get(0)));
+ assertTrue(replicas.endpoints().contains(hosts.get(1)));
}
// all moving nodes are back to the normal state
@@ -1009,6 +959,14 @@ public class MoveTest
return addrs;
}
+ private static EndpointsForRange makeReplicas(Range<Token> range, String... hosts) throws UnknownHostException
+ {
+ EndpointsForRange.Builder replicas = EndpointsForRange.builder(range, hosts.length);
+ for (String host : hosts)
+ replicas.add(Replica.fullReplica(InetAddressAndPort.getByName(host), range));
+ return replicas.build();
+ }
+
private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd)
{
KeyspaceMetadata ksmd = Schema.instance.getKeyspaceMetadata(keyspaceName);
@@ -1025,7 +983,7 @@ public class MoveTest
return new BigIntegerToken(String.valueOf(10 * position + 7));
}
- private int collectionSize(Collection<?> collection)
+ private static int collectionSize(Collection<?> collection)
{
if (collection.isEmpty())
return 0;
@@ -1057,8 +1015,52 @@ public class MoveTest
return ranges;
}
- private Range<Token> generateRange(int left, int right)
+ private static Token tk(int v)
+ {
+ return new BigIntegerToken(String.valueOf(v));
+ }
+
+ private static Range<Token> generateRange(int left, int right)
+ {
+ return new Range<Token>(tk(left), tk(right));
+ }
+
+ private static Replica replica(InetAddressAndPort endpoint, int left, int right, boolean full)
+ {
+ return new Replica(endpoint, tk(left), tk(right), full);
+ }
+
+ private static InetAddressAndPort inet(String name)
{
- return new Range<Token>(new BigIntegerToken(String.valueOf(left)), new BigIntegerToken(String.valueOf(right)));
+ try
+ {
+ return InetAddressAndPort.getByName(name);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ private static Replica replica(InetAddressAndPort endpoint, int left, int right)
+ {
+ return replica(endpoint, left, right, true);
+ }
+
+ private static void assertRanges(RangesByEndpoint epReplicas, String endpoint, int... rangePairs)
+ {
+ if (rangePairs.length % 2 == 1)
+ throw new RuntimeException("assertRanges argument count should be even");
+
+ InetAddressAndPort ep = inet(endpoint);
+ List<Replica> expected = new ArrayList<>(rangePairs.length/2);
+ for (int i=0; i<rangePairs.length; i+=2)
+ expected.add(replica(ep, rangePairs[i], rangePairs[i+1]));
+
+ RangesAtEndpoint actual = epReplicas.get(ep);
+ assertEquals(expected.size(), actual.size());
+ for (Replica replica : expected)
+ if (!actual.contains(replica))
+ assertEquals(RangesAtEndpoint.copyOf(expected), actual);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/MoveTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTransientTest.java b/test/unit/org/apache/cassandra/service/MoveTransientTest.java
new file mode 100644
index 0000000..1e24735
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/MoveTransientTest.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Predicate;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.service.StorageServiceTest.assertMultimapEqualsIgnoreOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This is also fairly effectively testing source retrieval for bootstrap as well since RangeStreamer
+ * is used to calculate the endpoints to fetch from and check they are alive for both RangeRelocator (move) and
+ * bootstrap (RangeRelocator).
+ */
+public class MoveTransientTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(MoveTransientTest.class);
+
+ static InetAddressAndPort aAddress;
+ static InetAddressAndPort bAddress;
+ static InetAddressAndPort cAddress;
+ static InetAddressAndPort dAddress;
+ static InetAddressAndPort eAddress;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception
+ {
+ aAddress = InetAddressAndPort.getByName("127.0.0.1");
+ bAddress = InetAddressAndPort.getByName("127.0.0.2");
+ cAddress = InetAddressAndPort.getByName("127.0.0.3");
+ dAddress = InetAddressAndPort.getByName("127.0.0.4");
+ eAddress = InetAddressAndPort.getByName("127.0.0.5");
+ }
+
+ private final List<InetAddressAndPort> downNodes = new ArrayList();
+ Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint());
+
+ private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>();
+ private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint()));
+
+ @After
+ public void clearDownNode()
+ {
+ downNodes.clear();
+ sourceFilterDownNodes.clear();
+ }
+
+ @BeforeClass
+ public static void setupDD()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ }
+
+ Token oneToken = new RandomPartitioner.BigIntegerToken("1");
+ Token twoToken = new RandomPartitioner.BigIntegerToken("2");
+ Token threeToken = new RandomPartitioner.BigIntegerToken("3");
+ Token fourToken = new RandomPartitioner.BigIntegerToken("4");
+ Token sixToken = new RandomPartitioner.BigIntegerToken("6");
+ Token sevenToken = new RandomPartitioner.BigIntegerToken("7");
+ Token nineToken = new RandomPartitioner.BigIntegerToken("9");
+ Token elevenToken = new RandomPartitioner.BigIntegerToken("11");
+ Token fourteenToken = new RandomPartitioner.BigIntegerToken("14");
+
+ Range<Token> aRange = new Range(oneToken, threeToken);
+ Range<Token> bRange = new Range(threeToken, sixToken);
+ Range<Token> cRange = new Range(sixToken, nineToken);
+ Range<Token> dRange = new Range(nineToken, elevenToken);
+ Range<Token> eRange = new Range(elevenToken, oneToken);
+
+
+ RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(aAddress, aRange, true),
+ new Replica(aAddress, eRange, true),
+ new Replica(aAddress, dRange, false));
+
+
+ /**
+ * Ring with start A 1-3 B 3-6 C 6-9 D 9-1
+ * A's token moves from 3 to 4.
+ * <p>
+ * Result is A gains some range
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCalculateStreamAndFetchRangesMoveForward() throws Exception
+ {
+ calculateStreamAndFetchRangesMoveForward();
+ }
+
+ private Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRangesMoveForward() throws Exception
+ {
+ Range<Token> aPrimeRange = new Range<>(oneToken, fourToken);
+
+ RangesAtEndpoint updated = RangesAtEndpoint.of(
+ new Replica(aAddress, aPrimeRange, true),
+ new Replica(aAddress, eRange, true),
+ new Replica(aAddress, dRange, false)
+ );
+
+ Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+ assertContentsIgnoreOrder(result.left);
+ assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, fourToken));
+ return result;
+ }
+
+ /**
+ * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1
+ * A's token moves from 3 to 14
+ * <p>
+ * Result is A loses range and it must be streamed
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCalculateStreamAndFetchRangesMoveBackwardBetween() throws Exception
+ {
+ calculateStreamAndFetchRangesMoveBackwardBetween();
+ }
+
+ public Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRangesMoveBackwardBetween() throws Exception
+ {
+ Range<Token> aPrimeRange = new Range<>(elevenToken, fourteenToken);
+
+ RangesAtEndpoint updated = RangesAtEndpoint.of(
+ new Replica(aAddress, aPrimeRange, true),
+ new Replica(aAddress, dRange, true),
+ new Replica(aAddress, cRange, false)
+ );
+
+
+ Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+ assertContentsIgnoreOrder(result.left, fullReplica(aAddress, oneToken, threeToken), fullReplica(aAddress, fourteenToken, oneToken));
+ assertContentsIgnoreOrder(result.right, transientReplica(aAddress, sixToken, nineToken), fullReplica(aAddress, nineToken, elevenToken));
+ return result;
+ }
+
+ /**
+ * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1
+ * A's token moves from 3 to 2
+ *
+ * Result is A loses range and it must be streamed
+ * @throws Exception
+ */
+ @Test
+ public void testCalculateStreamAndFetchRangesMoveBackward() throws Exception
+ {
+ calculateStreamAndFetchRangesMoveBackward();
+ }
+
+ private Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRangesMoveBackward() throws Exception
+ {
+ Range<Token> aPrimeRange = new Range<>(oneToken, twoToken);
+
+ RangesAtEndpoint updated = RangesAtEndpoint.of(
+ new Replica(aAddress, aPrimeRange, true),
+ new Replica(aAddress, eRange, true),
+ new Replica(aAddress, dRange, false)
+ );
+
+ Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+
+ //Moving backwards has no impact on any replica. We already fully replicate counter clockwise
+ //The transient replica does transiently replicate slightly more, but that is addressed by cleanup
+ assertContentsIgnoreOrder(result.left, fullReplica(aAddress, twoToken, threeToken));
+ assertContentsIgnoreOrder(result.right);
+
+ return result;
+ }
+
+ /**
+ * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1
+ * A's moves from 3 to 7
+ *
+ * @throws Exception
+ */
+ private Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRangesMoveForwardBetween() throws Exception
+ {
+ Range<Token> aPrimeRange = new Range<>(sixToken, sevenToken);
+ Range<Token> bPrimeRange = new Range<>(oneToken, sixToken);
+
+
+ RangesAtEndpoint updated = RangesAtEndpoint.of(
+ new Replica(aAddress, aPrimeRange, true),
+ new Replica(aAddress, bPrimeRange, true),
+ new Replica(aAddress, eRange, false)
+ );
+
+ Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+
+ assertContentsIgnoreOrder(result.left, fullReplica(aAddress, elevenToken, oneToken), transientReplica(aAddress, nineToken, elevenToken));
+ assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, sixToken), fullReplica(aAddress, sixToken, sevenToken));
+ return result;
+ }
+
+ /**
+ * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1
+ * A's token moves from 3 to 7
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCalculateStreamAndFetchRangesMoveForwardBetween() throws Exception
+ {
+ calculateStreamAndFetchRangesMoveForwardBetween();
+ }
+
+ /**
+ * Construct the ring state for calculateStreamAndFetchRangesMoveBackwardBetween
+ * Where are A moves from 3 to 14
+ * @return
+ */
+ private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackwardBetween()
+ {
+ TokenMetadata tmd = new TokenMetadata();
+ tmd.updateNormalToken(aRange.right, aAddress);
+ tmd.updateNormalToken(bRange.right, bAddress);
+ tmd.updateNormalToken(cRange.right, cAddress);
+ tmd.updateNormalToken(dRange.right, dAddress);
+ tmd.updateNormalToken(eRange.right, eAddress);
+ tmd.addMovingEndpoint(fourteenToken, aAddress);
+ TokenMetadata updated = tmd.cloneAfterAllSettled();
+
+ return Pair.create(tmd, updated);
+ }
+
+
+ /**
+ * Construct the ring state for calculateStreamAndFetchRangesMoveForwardBetween
+ * Where are A moves from 3 to 7
+ * @return
+ */
+ private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForwardBetween()
+ {
+ TokenMetadata tmd = new TokenMetadata();
+ tmd.updateNormalToken(aRange.right, aAddress);
+ tmd.updateNormalToken(bRange.right, bAddress);
+ tmd.updateNormalToken(cRange.right, cAddress);
+ tmd.updateNormalToken(dRange.right, dAddress);
+ tmd.updateNormalToken(eRange.right, eAddress);
+ tmd.addMovingEndpoint(sevenToken, aAddress);
+ TokenMetadata updated = tmd.cloneAfterAllSettled();
+
+ return Pair.create(tmd, updated);
+ }
+
+ private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackward()
+ {
+ TokenMetadata tmd = new TokenMetadata();
+ tmd.updateNormalToken(aRange.right, aAddress);
+ tmd.updateNormalToken(bRange.right, bAddress);
+ tmd.updateNormalToken(cRange.right, cAddress);
+ tmd.updateNormalToken(dRange.right, dAddress);
+ tmd.updateNormalToken(eRange.right, eAddress);
+ tmd.addMovingEndpoint(twoToken, aAddress);
+ TokenMetadata updated = tmd.cloneAfterAllSettled();
+
+ return Pair.create(tmd, updated);
+ }
+
+ private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForward()
+ {
+ TokenMetadata tmd = new TokenMetadata();
+ tmd.updateNormalToken(aRange.right, aAddress);
+ tmd.updateNormalToken(bRange.right, bAddress);
+ tmd.updateNormalToken(cRange.right, cAddress);
+ tmd.updateNormalToken(dRange.right, dAddress);
+ tmd.updateNormalToken(eRange.right, eAddress);
+ tmd.addMovingEndpoint(fourToken, aAddress);
+ TokenMetadata updated = tmd.cloneAfterAllSettled();
+
+ return Pair.create(tmd, updated);
+ }
+
+
+ @Test
+ public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints() throws Exception
+ {
+ EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
+
+ InetAddressAndPort cOrB = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress;
+
+ //Need to pull the full replica and the transient replica that is losing the range
+ expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), fullReplica(dAddress, sixToken, nineToken));
+ expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), transientReplica(eAddress, sixToken, nineToken));
+
+ //Same need both here as well
+ expectedResult.put(fullReplica(aAddress, threeToken, sixToken), fullReplica(cOrB, threeToken, sixToken));
+ expectedResult.put(fullReplica(aAddress, threeToken, sixToken), transientReplica(dAddress, threeToken, sixToken));
+
+ invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().right,
+ constructTMDsMoveForwardBetween(),
+ expectedResult.asImmutableView());
+ }
+
+ @Test
+ public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
+ {
+ for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress})
+ {
+ downNodes.clear();
+ downNodes.add(downNode);
+ boolean threw = false;
+ try
+ {
+ testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+ }
+ catch (IllegalStateException ise)
+ {
+ ise.printStackTrace();
+ assertTrue(downNode.toString(),
+ ise.getMessage().startsWith("A node required to move the data consistently is down:")
+ && ise.getMessage().contains(downNode.toString()));
+ threw = true;
+ }
+ assertTrue("Didn't throw for " + downNode, threw);
+ }
+
+ //Shouldn't throw because another full replica is available
+ downNodes.clear();
+ downNodes.add(cAddress);
+ testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+ }
+
+ @Test
+ public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
+ {
+ for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress})
+ {
+ sourceFilterDownNodes.clear();
+ sourceFilterDownNodes.add(downNode);
+ boolean threw = false;
+ try
+ {
+ testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+ }
+ catch (IllegalStateException ise)
+ {
+ ise.printStackTrace();
+ assertTrue(downNode.toString(),
+ ise.getMessage().startsWith("Necessary replicas for strict consistency were removed by source filters:")
+ && ise.getMessage().contains(downNode.toString()));
+ threw = true;
+ }
+ assertTrue("Didn't throw for " + downNode, threw);
+ }
+
+ //Shouldn't throw because another full replica is available
+ sourceFilterDownNodes.clear();
+ sourceFilterDownNodes.add(cAddress);
+ testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+ }
+
+ @Test
+ public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints() throws Exception
+ {
+ EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
+
+ //Need to pull the full replica and the transient replica that is losing the range
+ expectedResult.put(fullReplica(aAddress, nineToken, elevenToken), fullReplica(eAddress, nineToken, elevenToken));
+ expectedResult.put(transientReplica(aAddress, sixToken, nineToken), transientReplica(eAddress, sixToken, nineToken));
+
+ invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().right,
+ constructTMDsMoveBackwardBetween(),
+ expectedResult.asImmutableView());
+
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
+ {
+ //Any replica can be the full replica so this will always fail on the transient range
+ downNodes.add(eAddress);
+ testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
+ {
+ //Any replica can be the full replica so this will always fail on the transient range
+ sourceFilterDownNodes.add(eAddress);
+ testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+ }
+
+
+ //There is no down node version of this test because nothing needs to be fetched
+ @Test
+ public void testMoveBackwardCalculateRangesToFetchWithPreferredEndpoints() throws Exception
+ {
+ //Moving backwards should fetch nothing and fetch ranges is emptys so this doesn't test a ton
+ EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
+
+ invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().right,
+ constructTMDsMoveBackward(),
+ expectedResult.asImmutableView());
+
+ }
+
+ @Test
+ public void testMoveForwardCalculateRangesToFetchWithPreferredEndpoints() throws Exception
+ {
+ EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
+
+ InetAddressAndPort cOrBAddress = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress;
+
+ //Need to pull the full replica and the transient replica that is losing the range
+ expectedResult.put(fullReplica(aAddress, threeToken, fourToken), fullReplica(cOrBAddress, threeToken, sixToken));
+ expectedResult.put(fullReplica(aAddress, threeToken, fourToken), transientReplica(dAddress, threeToken, sixToken));
+
+ invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().right,
+ constructTMDsMoveForward(),
+ expectedResult.asImmutableView());
+
+ }
+
+ @Test
+ public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
+ {
+ downNodes.add(dAddress);
+ boolean threw = false;
+ try
+ {
+ testMoveForwardCalculateRangesToFetchWithPreferredEndpoints();
+ }
+ catch (IllegalStateException ise)
+ {
+ ise.printStackTrace();
+ assertTrue(dAddress.toString(),
+ ise.getMessage().startsWith("A node required to move the data consistently is down:")
+ && ise.getMessage().contains(dAddress.toString()));
+ threw = true;
+ }
+ assertTrue("Didn't throw for " + dAddress, threw);
+
+ //Shouldn't throw because another full replica is available
+ downNodes.clear();
+ downNodes.add(cAddress);
+ testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+ }
+
+ @Test
+ public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
+ {
+ sourceFilterDownNodes.add(dAddress);
+ boolean threw = false;
+ try
+ {
+ testMoveForwardCalculateRangesToFetchWithPreferredEndpoints();
+ }
+ catch (IllegalStateException ise)
+ {
+ ise.printStackTrace();
+ assertTrue(dAddress.toString(),
+ ise.getMessage().startsWith("Necessary replicas for strict consistency were removed by source filters:")
+ && ise.getMessage().contains(dAddress.toString()));
+ threw = true;
+ }
+ assertTrue("Didn't throw for " + dAddress, threw);
+
+ //Shouldn't throw because another full replica is available
+ sourceFilterDownNodes.clear();
+ sourceFilterDownNodes.add(cAddress);
+ testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+ }
+
+ private void invokeCalculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint toFetch,
+ Pair<TokenMetadata, TokenMetadata> tmds,
+ EndpointsByReplica expectedResult)
+ {
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+
+ EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas.sorted((a, b) -> b.endpoint().compareTo(a.endpoint())),
+ simpleStrategy(tmds.left),
+ toFetch,
+ true,
+ tmds.left,
+ tmds.right,
+ alivePredicate,
+ "OldNetworkTopologyStrategyTest",
+ sourceFilters);
+ logger.info("Ranges to fetch with preferred endpoints");
+ logger.info(result.toString());
+ assertMultimapEqualsIgnoreOrder(expectedResult, result);
+
+ }
+
+ private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
+ {
+ IEndpointSnitch snitch = new AbstractEndpointSnitch()
+ {
+ public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2)
+ {
+ return 0;
+ }
+
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ return "R1";
+ }
+
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ return "DC1";
+ }
+ };
+
+ return new SimpleStrategy("MoveTransientTest",
+ tmd,
+ snitch,
+ com.google.common.collect.ImmutableMap.of("replication_factor", "3/1"));
+ }
+
+ @Test
+ public void testMoveForwardBetweenCalculateRangesToStreamWithPreferredEndpoints() throws Exception
+ {
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+ RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
+
+ //Need to pull the full replica and the transient replica that is losing the range
+ expectedResult.put(bAddress, transientReplica(bAddress, nineToken, elevenToken));
+ expectedResult.put(bAddress, fullReplica(bAddress, elevenToken, oneToken));
+
+ invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().left,
+ constructTMDsMoveForwardBetween(),
+ expectedResult.asImmutableView());
+ }
+
+ @Test
+ public void testMoveBackwardBetweenCalculateRangesToStreamWithPreferredEndpoints() throws Exception
+ {
+ RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
+
+ expectedResult.put(bAddress, fullReplica(bAddress, fourteenToken, oneToken));
+
+ expectedResult.put(dAddress, transientReplica(dAddress, oneToken, threeToken));
+
+ expectedResult.put(cAddress, fullReplica(cAddress, oneToken, threeToken));
+ expectedResult.put(cAddress, transientReplica(cAddress, fourteenToken, oneToken));
+
+ invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().left,
+ constructTMDsMoveBackwardBetween(),
+ expectedResult.asImmutableView());
+ }
+
+ @Test
+ public void testMoveBackwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception
+ {
+ RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
+ expectedResult.put(cAddress, fullReplica(cAddress, twoToken, threeToken));
+ expectedResult.put(dAddress, transientReplica(dAddress, twoToken, threeToken));
+
+ invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().left,
+ constructTMDsMoveBackward(),
+ expectedResult.asImmutableView());
+ }
+
+ @Test
+ public void testMoveForwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception
+ {
+ //Nothing to stream moving forward because we are acquiring more range not losing range
+ RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
+
+ invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().left,
+ constructTMDsMoveForward(),
+ expectedResult.asImmutableView());
+ }
+
+ private void invokeCalculateRangesToStreamWithPreferredEndpoints(RangesAtEndpoint toStream,
+ Pair<TokenMetadata, TokenMetadata> tmds,
+ RangesByEndpoint expectedResult)
+ {
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+ StorageService.RangeRelocator relocator = new StorageService.RangeRelocator();
+ RangesByEndpoint result = relocator.calculateRangesToStreamWithEndpoints(toStream,
+ simpleStrategy(tmds.left),
+ tmds.left,
+ tmds.right);
+ logger.info("Ranges to stream by endpoint");
+ logger.info(result.toString());
+ assertMultimapEqualsIgnoreOrder(expectedResult, result);
+ }
+
+ private static void assertContentsIgnoreOrder(RangesAtEndpoint ranges, Replica ... replicas)
+ {
+ assertEquals(ranges.size(), replicas.length);
+ for (Replica replica : replicas)
+ if (!ranges.contains(replica))
+ assertEquals(RangesAtEndpoint.of(replicas), ranges);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org