You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/01 19:20:40 UTC
svn commit: r959726 - in /cassandra/trunk:
src/java/org/apache/cassandra/locator/ test/conf/
test/unit/org/apache/cassandra/dht/ test/unit/org/apache/cassandra/locator/
test/unit/org/apache/cassandra/service/
Author: jbellis
Date: Thu Jul 1 17:20:39 2010
New Revision: 959726
URL: http://svn.apache.org/viewvc?rev=959726&view=rev
Log:
sanity-check replica count against number of nodes in the cluster. patch by mdennis; reviewed by jbellis for CASSANDRA-1191
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
cassandra/trunk/test/conf/cassandra-rack.properties
cassandra/trunk/test/conf/cassandra.yaml
cassandra/trunk/test/conf/datacenters.properties
cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Jul 1 17:20:39 2010
@@ -20,7 +20,10 @@
package org.apache.cassandra.locator;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,9 +71,11 @@ public abstract class AbstractReplicatio
* @param searchToken the token the natural endpoints are requested for
* @param table the table the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token and table
+ * @throws IllegalStateException if the number of requested replicas is greater than the number of known endpints
*/
- public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table) throws IllegalStateException
{
+ int replicas = getReplicationFactor(table);
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
EndpointCacheKey cacheKey = new EndpointCacheKey(table, keyToken);
ArrayList<InetAddress> endpoints = cachedEndpoints.get(cacheKey);
@@ -83,10 +88,23 @@ public abstract class AbstractReplicatio
cachedEndpoints.put(cacheKey, endpoints);
}
+ // calculateNaturalEndpoints should have checked this already, this is a safety
+ assert replicas <= endpoints.size();
+
return new ArrayList<InetAddress>(endpoints);
}
- public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table);
+ /**
+ * calculate the natural endpionts for the given token, for the given table.
+ *
+ * @see #getNaturalEndpoints(org.apache.cassandra.dht.Token, String)
+ *
+ * @param searchToken the token the natural endpoints are requested for
+ * @param table the table the natural endpoints are requested for
+ * @return a copy of the natural endpoints for the given token and table
+ * @throws IllegalStateException if the number of requested replicas is greater than the number of known endpoints
+ */
+ public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table) throws IllegalStateException;
public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
Multimap<InetAddress, InetAddress> hintedEndpoints,
@@ -95,7 +113,13 @@ public abstract class AbstractReplicatio
{
return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
}
-
+
+ // instance method so test subclasses can override it
+ int getReplicationFactor(String table)
+ {
+ return DatabaseDescriptor.getReplicationFactor(table);
+ }
+
/**
* returns <tt>Multimap</tt> of {live destination: ultimate targets}, where if target is not the same
* as the destination, it is a "hinted" write, and will need to be sent to
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java Thu Jul 1 17:20:39 2010
@@ -94,7 +94,7 @@ public class DatacenterShardStrategy ext
public Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table)
{
- int totalReplicas = getReplicationfactor(table);
+ int totalReplicas = getReplicationFactor(table);
Map<String, Integer> remainingReplicas = new HashMap<String, Integer>(datacenters.get(table));
Map<String, Set<String>> dcUsedRacks = new HashMap<String, Set<String>>();
Set<InetAddress> endpoints = new HashSet<InetAddress>(totalReplicas);
@@ -125,7 +125,7 @@ public class DatacenterShardStrategy ext
}
}
- // 2nd pass: if replica count has not been achieved from unique racks, add nodes from the same racks
+ // second pass: if replica count has not been achieved from unique racks, add nodes from the same racks
for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken);
endpoints.size() < totalReplicas && iter.hasNext();)
{
@@ -143,10 +143,16 @@ public class DatacenterShardStrategy ext
}
}
+ for (Map.Entry<String, Integer> entry : remainingReplicas.entrySet())
+ {
+ if (entry.getValue() > 0)
+ throw new IllegalStateException(String.format("datacenter (%s) has no more endpoints, (%s) replicas still needed", entry.getKey(), entry.getValue()));
+ }
+
return endpoints;
}
- public int getReplicationfactor(String table)
+ public int getReplicationFactor(String table)
{
int total = 0;
for (int repFactor : datacenters.get(table).values())
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Thu Jul 1 17:20:39 2010
@@ -46,7 +46,7 @@ public class RackAwareStrategy extends A
public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
- int replicas = DatabaseDescriptor.getReplicationFactor(table);
+ int replicas = getReplicationFactor(table);
Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas);
ArrayList<Token> tokens = metadata.sortedTokens();
@@ -100,6 +100,9 @@ public class RackAwareStrategy extends A
if (!endpoints.contains(metadata.getEndpoint(t)))
endpoints.add(metadata.getEndpoint(t));
}
+
+ if (endpoints.size() < replicas)
+ throw new IllegalStateException(String.format("replication factor (%s) exceeds number of endpoints (%s)", replicas, endpoints.size()));
}
return endpoints;
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Thu Jul 1 17:20:39 2010
@@ -31,7 +31,7 @@ import org.apache.cassandra.dht.Token;
/**
* This class returns the nodes responsible for a given
* key but does not respect rack awareness. Basically
- * returns the 3 nodes that lie right next to each other
+ * returns the RF nodes that lie right next to each other
* on the ring.
*/
public class RackUnawareStrategy extends AbstractReplicationStrategy
@@ -43,7 +43,7 @@ public class RackUnawareStrategy extends
public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
- int replicas = DatabaseDescriptor.getReplicationFactor(table);
+ int replicas = getReplicationFactor(table);
ArrayList<Token> tokens = metadata.sortedTokens();
Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas);
@@ -57,6 +57,9 @@ public class RackUnawareStrategy extends
endpoints.add(metadata.getEndpoint(iter.next()));
}
+ if (endpoints.size() < replicas)
+ throw new IllegalStateException(String.format("replication factor (%s) exceeds number of endpoints (%s)", replicas, endpoints.size()));
+
return endpoints;
}
}
Modified: cassandra/trunk/test/conf/cassandra-rack.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra-rack.properties?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra-rack.properties (original)
+++ cassandra/trunk/test/conf/cassandra-rack.properties Thu Jul 1 17:20:39 2010
@@ -33,10 +33,13 @@
10.20.114.15=DC2:RAC2
127.0.0.1=DC1:RAC1
-127.0.0.2=DC2:RAC2
-127.0.0.3=DC3:RAC3
-127.0.0.4=DC4:RAC4
-127.0.0.5=DC3:RAC3
+127.0.0.2=DC1:RAC2
+127.0.0.3=DC1:RAC3
+127.0.0.4=DC2:RAC4
+127.0.0.5=DC2:RAC5
+127.0.0.6=DC3:RAC6
+127.0.0.7=DC3:RAC7
+127.0.0.8=DC3:RAC8
# default for unknown nodes
default=DC1:r1
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Thu Jul 1 17:20:39 2010
@@ -100,3 +100,10 @@ keyspaces:
- name: Super4
column_type: Super
compare_subcolumns_with: TimeUUIDType
+
+ - name: Keyspace5
+ replica_placement_strategy: org.apache.cassandra.locator.RackUnawareStrategy
+ replication_factor: 2
+ column_families:
+
+ - name: Standard1
Modified: cassandra/trunk/test/conf/datacenters.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/datacenters.properties?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/conf/datacenters.properties (original)
+++ cassandra/trunk/test/conf/datacenters.properties Thu Jul 1 17:20:39 2010
@@ -22,5 +22,5 @@ Keyspace1\:DC1=3
Keyspace1\:DC2=2
Keyspace1\:DC3=1
Keyspace3\:DC1=3
-Keyspace3\:DC2=2
+Keyspace3\:DC2=1
Keyspace3\:DC3=1
\ No newline at end of file
Modified: cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Thu Jul 1 17:20:39 2010
@@ -54,29 +54,32 @@ public class BootStrapperTest extends Cl
{
StorageService ss = StorageService.instance;
- generateFakeEndpoints(3);
+ generateFakeEndpoints(5);
+
+ InetAddress two = InetAddress.getByName("127.0.0.2");
+ InetAddress three = InetAddress.getByName("127.0.0.3");
+ InetAddress four = InetAddress.getByName("127.0.0.4");
+ InetAddress five = InetAddress.getByName("127.0.0.5");
- InetAddress one = InetAddress.getByName("127.0.0.2");
- InetAddress two = InetAddress.getByName("127.0.0.3");
- InetAddress three = InetAddress.getByName("127.0.0.4");
Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
- load.put(one, 1.0);
load.put(two, 2.0);
load.put(three, 3.0);
+ load.put(four, 4.0);
+ load.put(five, 5.0);
TokenMetadata tmd = ss.getTokenMetadata();
InetAddress source = BootStrapper.getBootstrapSource(tmd, load);
- assert three.equals(source);
+ assert five.equals(source) : five + " != " + source;
InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
- Range range3 = ss.getPrimaryRangeForEndpoint(three);
- Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left, range3.right);
- assert range3.contains(fakeToken);
+ Range range5 = ss.getPrimaryRangeForEndpoint(five);
+ Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range5.left, range5.right);
+ assert range5.contains(fakeToken);
ss.onChange(myEndpoint, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + ss.getPartitioner().getTokenFactory().toString(fakeToken)));
tmd = ss.getTokenMetadata();
- InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);
- assert two.equals(source2) : source2;
+ InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load);
+ assert four.equals(source4) : four + " != " + source4;
}
@Test
Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Thu Jul 1 17:20:39 2010
@@ -48,6 +48,12 @@ public class ReplicationStrategyEndpoint
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1"));
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddress.getByName("127.0.0.4"));
+ //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddress.getByName("127.0.0.5"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddress.getByName("127.0.0.6"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddress.getByName("127.0.0.7"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddress.getByName("127.0.0.8"));
}
@Test
@@ -74,30 +80,35 @@ public class ReplicationStrategyEndpoint
public void runCacheRespectsTokenChangesTest(Class stratClass) throws Exception
{
- // TODO DSS is asked to provide a total of 6 replicas, but we never give it 6 endpoints.
- // thus we are testing undefined behavior, at best.
setup(stratClass);
+ ArrayList<InetAddress> initial;
ArrayList<InetAddress> endpoints;
endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
- assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+ assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
- // test token addition
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3"));
+ // test token addition, in DC2 before existing token
+ initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddress.getByName("127.0.0.5"));
endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
- assert endpoints.size() == 3 : StringUtils.join(endpoints, ",");
+ assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+ assert !endpoints.equals(initial);
- // test token removal
- tmd.removeEndpoint(InetAddress.getByName("127.0.0.2"));
+ // test token removal, newly created token
+ initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+ tmd.removeEndpoint(InetAddress.getByName("127.0.0.5"));
endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
- assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+ assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+ assert !endpoints.contains(InetAddress.getByName("127.0.0.5"));
+ assert !endpoints.equals(initial);
// test token change
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.5"));
+ initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+ //move .8 after search token but before other DC3
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddress.getByName("127.0.0.8"));
endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
- assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
- assert endpoints.contains(InetAddress.getByName("127.0.0.5"));
- assert !endpoints.contains(InetAddress.getByName("127.0.0.3"));
+ assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+ assert !endpoints.equals(initial);
}
protected static class FakeRackUnawareStrategy extends RackUnawareStrategy
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Thu Jul 1 17:20:39 2010
@@ -65,7 +65,7 @@ public class AntiEntropyServiceTest exte
public static void prepareClass() throws Exception
{
LOCAL = FBUtilities.getLocalAddress();
- tablename = "Keyspace4";
+ tablename = "Keyspace5";
StorageService.instance.initServer();
// generate a fake endpoint for which we can spoof receiving/sending trees
REMOTE = InetAddress.getByName("127.0.0.2");
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Thu Jul 1 17:20:39 2010
@@ -36,6 +36,7 @@ import org.apache.cassandra.locator.Abst
import org.apache.cassandra.locator.RackUnawareStrategy;
import org.apache.cassandra.locator.SimpleSnitch;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.Pair;
public class MoveTest extends CleanupHelper
{
@@ -48,17 +49,16 @@ public class MoveTest extends CleanupHel
return replacements;
}
-
/**
* Test whether write endpoints is correct when the node is leaving. Uses
* StorageService.onChange and does not manipulate token metadata directly.
*/
@Test
- public void testWriteEndpointsDuringLeave() throws UnknownHostException
+ public void newTestWriteEndpointsDuringLeave() throws Exception
{
StorageService ss = StorageService.instance;
- final int RING_SIZE = 5;
- final int LEAVING_NODE = 2;
+ final int RING_SIZE = 6;
+ final int LEAVING_NODE = 3;
TokenMetadata tmd = ss.getTokenMetadata();
tmd.clearUnsafe();
@@ -74,52 +74,47 @@ public class MoveTest extends CleanupHel
createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE);
- final Map<String, List<Range>> deadNodesRanges = new HashMap<String, List<Range>>();
+ final Map<Pair<String, Token>, List<InetAddress>> expectedEndpoints = new HashMap<Pair<String, Token>, List<InetAddress>>();
for (String table : DatabaseDescriptor.getNonSystemTables())
{
- List<Range> list = new ArrayList<Range>();
- list.addAll(testStrategy.getAddressRanges(table).get(hosts.get(LEAVING_NODE)));
- Collections.sort(list);
- deadNodesRanges.put(table, list);
+ for (Token token : keyTokens)
+ {
+ List<InetAddress> endpoints = new ArrayList<InetAddress>();
+ Pair<String, Token> key = new Pair<String, Token>(table, token);
+ Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), token);
+ while (tokenIter.hasNext())
+ {
+ endpoints.add(tmd.getEndpoint(tokenIter.next()));
+ }
+ expectedEndpoints.put(key, endpoints);
+ }
}
-
+
// Third node leaves
ss.onChange(hosts.get(LEAVING_NODE), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING_NODE))));
-
- // check that it is correctly marked as leaving in tmd
assertTrue(tmd.isLeaving(hosts.get(LEAVING_NODE)));
- // check that pending ranges are correct (primary range should go to 1st node, first
- // replica range to 4th node and 2nd replica range to 5th node)
for (String table : DatabaseDescriptor.getNonSystemTables())
{
- int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
-
- // if the ring minus the leaving node leaves us with less than RF, we're hosed.
- if (hosts.size()-1 < replicationFactor)
- continue;
-
- // verify that the replicationFactor nodes after the leaving node are gatherings it's pending ranges.
- // in the case where rf==5, we're screwed because we basically just lost data.
- for (int i = 0; i < replicationFactor; i++)
+ for (Token token : keyTokens)
{
- assertTrue(tmd.getPendingRanges(table, hosts.get((LEAVING_NODE + 1 + i) % RING_SIZE)).size() > 0);
- assertEquals(tmd.getPendingRanges(table, hosts.get((LEAVING_NODE + 1 + i) % RING_SIZE)).get(0), deadNodesRanges.get(table).get(i));
- }
+ Pair<String, Token> key = new Pair<String, Token>(table, token);
+ int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
- // note that we're iterating over nodes and sample tokens.
- final int replicaStart = (LEAVING_NODE-replicationFactor+RING_SIZE)%RING_SIZE;
- for (int i=0; i<keyTokens.size(); ++i)
- {
- Collection<InetAddress> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), table, testStrategy.getNaturalEndpoints(keyTokens.get(i), table));
- // figure out if this node is one of the nodes previous to the failed node (2).
- boolean isReplica = (i - replicaStart + RING_SIZE) % RING_SIZE < replicationFactor;
- // the preceeding leaving_node-replication_factor nodes should have and additional ep (replication_factor+1);
- if (isReplica)
- assertTrue(endpoints.size() == replicationFactor + 1);
- else
- assertTrue(endpoints.size() == replicationFactor);
+ HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, testStrategy.calculateNaturalEndpoints(token, tmd, table)));
+ HashSet<InetAddress> expected = new HashSet<InetAddress>();
+ for (int i = 0; i < replicationFactor; i++)
+ {
+ expected.add(expectedEndpoints.get(key).get(i));
+ }
+
+ // if the leaving node is in the endpoint list,
+ // then we should expect it plus one extra for when it's gone
+ if (expected.contains(hosts.get(LEAVING_NODE)))
+ expected.add(expectedEndpoints.get(key).get(replicationFactor));
+
+ assertEquals("mismatched endpoint sets", expected, actual);
}
}
@@ -152,7 +147,7 @@ public class MoveTest extends CleanupHel
createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE);
// nodes 6, 8 and 9 leave
- final int[] LEAVING = new int[] { 6, 8, 9};
+ final int[] LEAVING = new int[] {6, 8, 9};
for (int leaving : LEAVING)
ss.onChange(hosts.get(leaving), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(leaving))));
@@ -164,6 +159,9 @@ public class MoveTest extends CleanupHel
Collection<InetAddress> endpoints = null;
+ /* don't require test update every time a new keyspace is added to test/conf/cassandra.yaml */
+ List<String> tables = Arrays.asList("Keyspace1", "Keyspace2", "Keyspace3", "Keyspace4");
+
// pre-calculate the results.
Map<String, Multimap<Token, InetAddress>> expectedEndpoints = new HashMap<String, Multimap<Token, InetAddress>>();
expectedEndpoints.put("Keyspace1", HashMultimap.<Token, InetAddress>create());
@@ -211,7 +209,7 @@ public class MoveTest extends CleanupHel
expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3"));
expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1", "127.0.0.2", "127.0.0.3"));
- for (String table : DatabaseDescriptor.getNonSystemTables())
+ for (String table : tables)
{
for (int i = 0; i < keyTokens.size(); i++)
{
@@ -326,7 +324,7 @@ public class MoveTest extends CleanupHel
expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("75")).removeAll(makeAddrs("127.0.0.10"));
expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10"));
- for (String table : DatabaseDescriptor.getNonSystemTables())
+ for (String table : tables)
{
for (int i = 0; i < keyTokens.size(); i++)
{
@@ -425,7 +423,7 @@ public class MoveTest extends CleanupHel
List<InetAddress> hosts = new ArrayList<InetAddress>();
// create a ring or 5 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5);
+ createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7);
// node 2 leaves
ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2))));
@@ -494,7 +492,7 @@ public class MoveTest extends CleanupHel
List<InetAddress> hosts = new ArrayList<InetAddress>();
// create a ring or 5 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5);
+ createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6);
// node 2 leaves
ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2))));
@@ -538,7 +536,7 @@ public class MoveTest extends CleanupHel
List<InetAddress> hosts = new ArrayList<InetAddress>();
// create a ring or 5 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5);
+ createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6);
// node 2 leaves with _different_ token
ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(0))));
@@ -587,8 +585,8 @@ public class MoveTest extends CleanupHel
ArrayList<Token> keyTokens = new ArrayList<Token>();
List<InetAddress> hosts = new ArrayList<InetAddress>();
- // create a ring or 5 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5);
+ // create a ring of 6 nodes
+ createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7);
// node hosts.get(2) goes jumps to left
ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + StorageService.LEFT_NORMALLY + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2))));