You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/01 19:20:40 UTC

svn commit: r959726 - in /cassandra/trunk: src/java/org/apache/cassandra/locator/ test/conf/ test/unit/org/apache/cassandra/dht/ test/unit/org/apache/cassandra/locator/ test/unit/org/apache/cassandra/service/

Author: jbellis
Date: Thu Jul  1 17:20:39 2010
New Revision: 959726

URL: http://svn.apache.org/viewvc?rev=959726&view=rev
Log:
sanity-check replica count against number of nodes in the cluster.  patch by mdennis; reviewed by jbellis for CASSANDRA-1191

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    cassandra/trunk/test/conf/cassandra-rack.properties
    cassandra/trunk/test/conf/cassandra.yaml
    cassandra/trunk/test/conf/datacenters.properties
    cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Jul  1 17:20:39 2010
@@ -20,7 +20,10 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,9 +71,11 @@ public abstract class AbstractReplicatio
      * @param searchToken the token the natural endpoints are requested for
      * @param table the table the natural endpoints are requested for
      * @return a copy of the natural endpoints for the given token and table
+     * @throws IllegalStateException if the number of requested replicas is greater than the number of known endpints
      */
-    public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table)
+    public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table) throws IllegalStateException
     {
+        int replicas = getReplicationFactor(table);
         Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
         EndpointCacheKey cacheKey = new EndpointCacheKey(table, keyToken);
         ArrayList<InetAddress> endpoints = cachedEndpoints.get(cacheKey);
@@ -83,10 +88,23 @@ public abstract class AbstractReplicatio
             cachedEndpoints.put(cacheKey, endpoints);
         }
 
+        // calculateNaturalEndpoints should have checked this already, this is a safety
+        assert replicas <= endpoints.size();
+
         return new ArrayList<InetAddress>(endpoints);
     }
 
-    public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table);
+    /**
+     * calculate the natural endpionts for the given token, for the given table.
+     *
+     * @see #getNaturalEndpoints(org.apache.cassandra.dht.Token, String)
+     *
+     * @param searchToken the token the natural endpoints are requested for
+     * @param table the table the natural endpoints are requested for
+     * @return a copy of the natural endpoints for the given token and table
+     * @throws IllegalStateException if the number of requested replicas is greater than the number of known endpoints
+     */
+    public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table) throws IllegalStateException;
 
     public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
                                                                 Multimap<InetAddress, InetAddress> hintedEndpoints,
@@ -95,7 +113,13 @@ public abstract class AbstractReplicatio
     {
         return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
     }
-    
+
+    // instance method so test subclasses can override it
+    int getReplicationFactor(String table)
+    {
+       return DatabaseDescriptor.getReplicationFactor(table);
+    }
+
     /**
      * returns <tt>Multimap</tt> of {live destination: ultimate targets}, where if target is not the same
      * as the destination, it is a "hinted" write, and will need to be sent to

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java Thu Jul  1 17:20:39 2010
@@ -94,7 +94,7 @@ public class DatacenterShardStrategy ext
 
     public Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table)
     {
-        int totalReplicas = getReplicationfactor(table);
+        int totalReplicas = getReplicationFactor(table);
         Map<String, Integer> remainingReplicas = new HashMap<String, Integer>(datacenters.get(table));
         Map<String, Set<String>> dcUsedRacks = new HashMap<String, Set<String>>();
         Set<InetAddress> endpoints = new HashSet<InetAddress>(totalReplicas);
@@ -125,7 +125,7 @@ public class DatacenterShardStrategy ext
             }
         }
 
-        // 2nd pass: if replica count has not been achieved from unique racks, add nodes from the same racks
+        // second pass: if replica count has not been achieved from unique racks, add nodes from the same racks
         for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken);
              endpoints.size() < totalReplicas && iter.hasNext();)
         {
@@ -143,10 +143,16 @@ public class DatacenterShardStrategy ext
             }
         }
 
+        for (Map.Entry<String, Integer> entry : remainingReplicas.entrySet())
+        {
+            if (entry.getValue() > 0)
+                throw new IllegalStateException(String.format("datacenter (%s) has no more endpoints, (%s) replicas still needed", entry.getKey(), entry.getValue()));
+        }
+
         return endpoints;
     }
 
-    public int getReplicationfactor(String table)
+    public int getReplicationFactor(String table)
     {
         int total = 0;
         for (int repFactor : datacenters.get(table).values())

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Thu Jul  1 17:20:39 2010
@@ -46,7 +46,7 @@ public class RackAwareStrategy extends A
 
     public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
-        int replicas = DatabaseDescriptor.getReplicationFactor(table);
+        int replicas = getReplicationFactor(table);
         Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas);
         ArrayList<Token> tokens = metadata.sortedTokens();
 
@@ -100,6 +100,9 @@ public class RackAwareStrategy extends A
                 if (!endpoints.contains(metadata.getEndpoint(t)))
                     endpoints.add(metadata.getEndpoint(t));
             }
+
+            if (endpoints.size() < replicas)
+                throw new IllegalStateException(String.format("replication factor (%s) exceeds number of endpoints (%s)", replicas, endpoints.size()));
         }
 
         return endpoints;

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Thu Jul  1 17:20:39 2010
@@ -31,7 +31,7 @@ import org.apache.cassandra.dht.Token;
 /**
  * This class returns the nodes responsible for a given
  * key but does not respect rack awareness. Basically
- * returns the 3 nodes that lie right next to each other
+ * returns the RF nodes that lie right next to each other
  * on the ring.
  */
 public class RackUnawareStrategy extends AbstractReplicationStrategy
@@ -43,7 +43,7 @@ public class RackUnawareStrategy extends
 
     public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
-        int replicas = DatabaseDescriptor.getReplicationFactor(table);
+        int replicas = getReplicationFactor(table);
         ArrayList<Token> tokens = metadata.sortedTokens();
         Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas);
 
@@ -57,6 +57,9 @@ public class RackUnawareStrategy extends
             endpoints.add(metadata.getEndpoint(iter.next()));
         }
 
+        if (endpoints.size() < replicas)
+            throw new IllegalStateException(String.format("replication factor (%s) exceeds number of endpoints (%s)", replicas, endpoints.size()));
+        
         return endpoints;
     }
 }

Modified: cassandra/trunk/test/conf/cassandra-rack.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra-rack.properties?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra-rack.properties (original)
+++ cassandra/trunk/test/conf/cassandra-rack.properties Thu Jul  1 17:20:39 2010
@@ -33,10 +33,13 @@
 10.20.114.15=DC2:RAC2
 
 127.0.0.1=DC1:RAC1
-127.0.0.2=DC2:RAC2
-127.0.0.3=DC3:RAC3
-127.0.0.4=DC4:RAC4
-127.0.0.5=DC3:RAC3
+127.0.0.2=DC1:RAC2
+127.0.0.3=DC1:RAC3
+127.0.0.4=DC2:RAC4
+127.0.0.5=DC2:RAC5
+127.0.0.6=DC3:RAC6
+127.0.0.7=DC3:RAC7
+127.0.0.8=DC3:RAC8
 
 # default for unknown nodes
 default=DC1:r1

Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Thu Jul  1 17:20:39 2010
@@ -100,3 +100,10 @@ keyspaces:
         - name: Super4
           column_type: Super
           compare_subcolumns_with: TimeUUIDType
+
+    - name: Keyspace5
+      replica_placement_strategy: org.apache.cassandra.locator.RackUnawareStrategy
+      replication_factor: 2
+      column_families:
+
+        - name: Standard1

Modified: cassandra/trunk/test/conf/datacenters.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/datacenters.properties?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/conf/datacenters.properties (original)
+++ cassandra/trunk/test/conf/datacenters.properties Thu Jul  1 17:20:39 2010
@@ -22,5 +22,5 @@ Keyspace1\:DC1=3
 Keyspace1\:DC2=2
 Keyspace1\:DC3=1
 Keyspace3\:DC1=3
-Keyspace3\:DC2=2
+Keyspace3\:DC2=1
 Keyspace3\:DC3=1
\ No newline at end of file

Modified: cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Thu Jul  1 17:20:39 2010
@@ -54,29 +54,32 @@ public class BootStrapperTest extends Cl
     {
         StorageService ss = StorageService.instance;
 
-        generateFakeEndpoints(3);
+        generateFakeEndpoints(5);
+
+        InetAddress two = InetAddress.getByName("127.0.0.2");
+        InetAddress three = InetAddress.getByName("127.0.0.3");
+        InetAddress four = InetAddress.getByName("127.0.0.4");
+        InetAddress five = InetAddress.getByName("127.0.0.5");
 
-        InetAddress one = InetAddress.getByName("127.0.0.2");
-        InetAddress two = InetAddress.getByName("127.0.0.3");
-        InetAddress three = InetAddress.getByName("127.0.0.4");
         Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
-        load.put(one, 1.0);
         load.put(two, 2.0);
         load.put(three, 3.0);
+        load.put(four, 4.0);
+        load.put(five, 5.0);
 
         TokenMetadata tmd = ss.getTokenMetadata();
         InetAddress source = BootStrapper.getBootstrapSource(tmd, load);
-        assert three.equals(source);
+        assert five.equals(source) : five + " != " + source;
 
         InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
-        Range range3 = ss.getPrimaryRangeForEndpoint(three);
-        Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left, range3.right);
-        assert range3.contains(fakeToken);
+        Range range5 = ss.getPrimaryRangeForEndpoint(five);
+        Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range5.left, range5.right);
+        assert range5.contains(fakeToken);
         ss.onChange(myEndpoint, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + ss.getPartitioner().getTokenFactory().toString(fakeToken)));
         tmd = ss.getTokenMetadata();
 
-        InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);
-        assert two.equals(source2) : source2;
+        InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load);
+        assert four.equals(source4) : four + " != " + source4;
     }
 
     @Test

Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Thu Jul  1 17:20:39 2010
@@ -48,6 +48,12 @@ public class ReplicationStrategyEndpoint
 
         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1"));
         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddress.getByName("127.0.0.4"));
+        //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddress.getByName("127.0.0.5"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddress.getByName("127.0.0.6"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddress.getByName("127.0.0.7"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddress.getByName("127.0.0.8"));
     }
 
     @Test
@@ -74,30 +80,35 @@ public class ReplicationStrategyEndpoint
 
     public void runCacheRespectsTokenChangesTest(Class stratClass) throws Exception
     {
-        // TODO DSS is asked to provide a total of 6 replicas, but we never give it 6 endpoints.
-        // thus we are testing undefined behavior, at best.
         setup(stratClass);
+        ArrayList<InetAddress> initial;
         ArrayList<InetAddress> endpoints;
 
         endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
-        assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
 
-        // test token addition
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3"));
+        // test token addition, in DC2 before existing token
+        initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddress.getByName("127.0.0.5"));
         endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
-        assert endpoints.size() == 3 : StringUtils.join(endpoints, ",");
+        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+        assert !endpoints.equals(initial);
 
-        // test token removal
-        tmd.removeEndpoint(InetAddress.getByName("127.0.0.2"));
+        // test token removal, newly created token
+        initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+        tmd.removeEndpoint(InetAddress.getByName("127.0.0.5"));
         endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
-        assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+        assert !endpoints.contains(InetAddress.getByName("127.0.0.5"));
+        assert !endpoints.equals(initial);
 
         // test token change
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.5"));
+        initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+        //move .8 after search token but before other DC3
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddress.getByName("127.0.0.8"));
         endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
-        assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
-        assert endpoints.contains(InetAddress.getByName("127.0.0.5"));
-        assert !endpoints.contains(InetAddress.getByName("127.0.0.3"));
+        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+        assert !endpoints.equals(initial);        
     }
 
     protected static class FakeRackUnawareStrategy extends RackUnawareStrategy

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Thu Jul  1 17:20:39 2010
@@ -65,7 +65,7 @@ public class AntiEntropyServiceTest exte
     public static void prepareClass() throws Exception
     {
         LOCAL = FBUtilities.getLocalAddress();
-        tablename = "Keyspace4";
+        tablename = "Keyspace5";
         StorageService.instance.initServer();
         // generate a fake endpoint for which we can spoof receiving/sending trees
         REMOTE = InetAddress.getByName("127.0.0.2");

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=959726&r1=959725&r2=959726&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Thu Jul  1 17:20:39 2010
@@ -36,6 +36,7 @@ import org.apache.cassandra.locator.Abst
 import org.apache.cassandra.locator.RackUnawareStrategy;
 import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.Pair;
 
 public class MoveTest extends CleanupHelper
 {
@@ -48,17 +49,16 @@ public class MoveTest extends CleanupHel
         return replacements;
     }
 
-
     /**
      * Test whether write endpoints is correct when the node is leaving. Uses
      * StorageService.onChange and does not manipulate token metadata directly.
      */
     @Test
-    public void testWriteEndpointsDuringLeave() throws UnknownHostException
+    public void newTestWriteEndpointsDuringLeave() throws Exception
     {
         StorageService ss = StorageService.instance;
-        final int RING_SIZE = 5;
-        final int LEAVING_NODE = 2;
+        final int RING_SIZE = 6;
+        final int LEAVING_NODE = 3;
 
         TokenMetadata tmd = ss.getTokenMetadata();
         tmd.clearUnsafe();
@@ -74,52 +74,47 @@ public class MoveTest extends CleanupHel
 
         createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE);
 
-        final Map<String, List<Range>> deadNodesRanges = new HashMap<String, List<Range>>();
+        final Map<Pair<String, Token>, List<InetAddress>> expectedEndpoints = new HashMap<Pair<String, Token>, List<InetAddress>>();
         for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            List<Range> list = new ArrayList<Range>();
-            list.addAll(testStrategy.getAddressRanges(table).get(hosts.get(LEAVING_NODE)));
-            Collections.sort(list);
-            deadNodesRanges.put(table, list);
+            for (Token token : keyTokens)
+            {
+                List<InetAddress> endpoints = new ArrayList<InetAddress>();
+                Pair<String, Token> key = new Pair<String, Token>(table, token);
+                Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), token);
+                while (tokenIter.hasNext())
+                {
+                    endpoints.add(tmd.getEndpoint(tokenIter.next()));
+                }
+                expectedEndpoints.put(key, endpoints);
+            }
         }
-        
+
         // Third node leaves
         ss.onChange(hosts.get(LEAVING_NODE), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING_NODE))));
-
-        // check that it is correctly marked as leaving in tmd
         assertTrue(tmd.isLeaving(hosts.get(LEAVING_NODE)));
 
-        // check that pending ranges are correct (primary range should go to 1st node, first
-        // replica range to 4th node and 2nd replica range to 5th node)
         for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
-
-            // if the ring minus the leaving node leaves us with less than RF, we're hosed.
-            if (hosts.size()-1 < replicationFactor)
-                continue;
-            
-            // verify that the replicationFactor nodes after the leaving node are gatherings it's pending ranges.
-            // in the case where rf==5, we're screwed because we basically just lost data.
-            for (int i = 0; i < replicationFactor; i++)
+            for (Token token : keyTokens)
             {
-                assertTrue(tmd.getPendingRanges(table, hosts.get((LEAVING_NODE + 1 + i) % RING_SIZE)).size() > 0);
-                assertEquals(tmd.getPendingRanges(table, hosts.get((LEAVING_NODE + 1 + i) % RING_SIZE)).get(0), deadNodesRanges.get(table).get(i));
-            }
+                Pair<String, Token> key = new Pair<String, Token>(table, token);
+                int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
 
-            // note that we're iterating over nodes and sample tokens.
-            final int replicaStart = (LEAVING_NODE-replicationFactor+RING_SIZE)%RING_SIZE;
-            for (int i=0; i<keyTokens.size(); ++i)
-            {
-                Collection<InetAddress> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), table, testStrategy.getNaturalEndpoints(keyTokens.get(i), table));
-                // figure out if this node is one of the nodes previous to the failed node (2).
-                boolean isReplica = (i - replicaStart + RING_SIZE) % RING_SIZE < replicationFactor;
-                // the preceeding leaving_node-replication_factor nodes should have and additional ep (replication_factor+1);
-                if (isReplica)
-                    assertTrue(endpoints.size() == replicationFactor + 1);
-                else
-                    assertTrue(endpoints.size() == replicationFactor);
+                HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, testStrategy.calculateNaturalEndpoints(token, tmd, table)));
+                HashSet<InetAddress> expected = new HashSet<InetAddress>();
 
+                for (int i = 0; i < replicationFactor; i++)
+                {
+                    expected.add(expectedEndpoints.get(key).get(i));
+                }
+
+                // if the leaving node is in the endpoint list,
+                // then we should expect it plus one extra for when it's gone
+                if (expected.contains(hosts.get(LEAVING_NODE)))
+                    expected.add(expectedEndpoints.get(key).get(replicationFactor));
+
+                assertEquals("mismatched endpoint sets", expected, actual);
             }
         }
 
@@ -152,7 +147,7 @@ public class MoveTest extends CleanupHel
         createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE);
 
         // nodes 6, 8 and 9 leave
-        final int[] LEAVING = new int[] { 6, 8, 9};
+        final int[] LEAVING = new int[] {6, 8, 9};
         for (int leaving : LEAVING)
             ss.onChange(hosts.get(leaving), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(leaving))));
         
@@ -164,6 +159,9 @@ public class MoveTest extends CleanupHel
 
         Collection<InetAddress> endpoints = null;
 
+        /* don't require test update every time a new keyspace is added to test/conf/cassandra.yaml */
+        List<String> tables = Arrays.asList("Keyspace1", "Keyspace2", "Keyspace3", "Keyspace4");
+
         // pre-calculate the results.
         Map<String, Multimap<Token, InetAddress>> expectedEndpoints = new HashMap<String, Multimap<Token, InetAddress>>();
         expectedEndpoints.put("Keyspace1", HashMultimap.<Token, InetAddress>create());
@@ -211,7 +209,7 @@ public class MoveTest extends CleanupHel
         expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3"));
         expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1", "127.0.0.2", "127.0.0.3"));
 
-        for (String table : DatabaseDescriptor.getNonSystemTables())
+        for (String table : tables)
         {
             for (int i = 0; i < keyTokens.size(); i++)
             {
@@ -326,7 +324,7 @@ public class MoveTest extends CleanupHel
         expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("75")).removeAll(makeAddrs("127.0.0.10"));
         expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10"));
 
-        for (String table : DatabaseDescriptor.getNonSystemTables())
+        for (String table : tables)
         {
             for (int i = 0; i < keyTokens.size(); i++)
             {
@@ -425,7 +423,7 @@ public class MoveTest extends CleanupHel
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         // create a ring or 5 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5);
+        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7);
 
         // node 2 leaves
         ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2))));
@@ -494,7 +492,7 @@ public class MoveTest extends CleanupHel
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         // create a ring or 5 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5);
+        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6);
 
         // node 2 leaves
         ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2))));
@@ -538,7 +536,7 @@ public class MoveTest extends CleanupHel
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         // create a ring or 5 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5);
+        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6);
 
         // node 2 leaves with _different_ token
         ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(0))));
@@ -587,8 +585,8 @@ public class MoveTest extends CleanupHel
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
-        // create a ring or 5 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5);
+        // create a ring of 6 nodes
+        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7);
 
         // node hosts.get(2) goes jumps to left
         ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + StorageService.LEFT_NORMALLY + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2))));