You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/02/04 16:21:33 UTC

svn commit: r906521 [2/3] - in /incubator/cassandra/trunk: ./ conf/ contrib/circuit/src/org/apache/cassandra/contrib/circuit/ src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apa...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Feb  4 15:21:31 2010
@@ -23,7 +23,11 @@
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.net.InetAddress;
@@ -109,9 +113,9 @@
         return partitioner_;
     }
 
-    public Collection<Range> getLocalRanges()
+    public Collection<Range> getLocalRanges(String table)
     {
-        return getRangesForEndPoint(FBUtilities.getLocalAddress());
+        return getRangesForEndPoint(table, FBUtilities.getLocalAddress());
     }
 
     public Range getLocalPrimaryRange()
@@ -119,14 +123,6 @@
         return getPrimaryRangeForEndPoint(FBUtilities.getLocalAddress());
     }
 
-    /*
-     * This is the endpoint snitch which depends on the network architecture. We
-     * need to keep this information for each endpoint so that we make decisions
-     * while doing things like replication etc.
-     *
-     */
-    private IEndPointSnitch endPointSnitch_;
-
     /* This abstraction maintains the token/endpoint metadata information */
     private TokenMetadata tokenMetadata_ = new TokenMetadata();
     private SystemTable.StorageMetadata storageMetadata_;
@@ -140,20 +136,21 @@
                                                                                    new NamedThreadFactory("CONSISTENCY-MANAGER"));
 
     /* We use this interface to determine where replicas need to be placed */
-    private AbstractReplicationStrategy replicationStrategy_;
+    private Map<String, AbstractReplicationStrategy> replicationStrategies = new HashMap<String, AbstractReplicationStrategy>();
+
     /* Are we starting this node in bootstrap mode? */
     private boolean isBootstrapMode;
     private Multimap<InetAddress, String> bootstrapSet;
     /* when intialized as a client, we shouldn't write to the system table. */
     private boolean isClientMode;
-  
+
     public synchronized void addBootstrapSource(InetAddress s, String table)
     {
         if (logger_.isDebugEnabled())
-            logger_.debug("Added " + s + " as a bootstrap source");
+            logger_.debug(String.format("Added %s/%s as a bootstrap source", s, table));
         bootstrapSet.put(s, table);
     }
-    
+
     public synchronized void removeBootstrapSource(InetAddress s, String table)
     {
         if (table == null)
@@ -161,7 +158,7 @@
         else
             bootstrapSet.remove(s, table);
         if (logger_.isDebugEnabled())
-            logger_.debug("Removed " + s + " as a bootstrap source; remaining is [" + StringUtils.join(bootstrapSet.keySet(), ", ") + "]");
+            logger_.debug(String.format("Removed %s/%s as a bootstrap source; remaining is [%s]", s, table == null ? "<ALL>" : table, StringUtils.join(bootstrapSet.keySet(), ", ")));
 
         if (bootstrapSet.isEmpty())
         {
@@ -200,7 +197,6 @@
         }
 
         bootstrapSet = HashMultimap.create();
-        endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
 
         /* register the verb handlers */
         MessagingService.instance.registerVerbHandlers(Verb.BINARY, new BinaryVerbHandler());
@@ -222,19 +218,30 @@
         MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new Gossiper.GossipDigestSynVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new Gossiper.GossipDigestAckVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new Gossiper.GossipDigestAck2VerbHandler());
+    }
 
-        replicationStrategy_ = getReplicationStrategy(tokenMetadata_);
+    public synchronized AbstractReplicationStrategy getReplicationStrategy(String table)
+    {
+        AbstractReplicationStrategy strat = replicationStrategies.get(table);
+        if (strat == null)
+        {
+            strat = StorageService.getReplicationStrategy(tokenMetadata_, table);
+            replicationStrategies.put(table, strat);
+        }
+        return strat;
     }
 
-    public static AbstractReplicationStrategy getReplicationStrategy(TokenMetadata tokenMetadata)
+    public static AbstractReplicationStrategy getReplicationStrategy(TokenMetadata tokenMetadata, String table)
     {
         AbstractReplicationStrategy replicationStrategy = null;
-        Class<AbstractReplicationStrategy> cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
-        Class [] parameterTypes = new Class[] { TokenMetadata.class, int.class};
+        Class<AbstractReplicationStrategy> cls = DatabaseDescriptor.getReplicaPlacementStrategyClass(table);
+        if (cls == null)
+            throw new RuntimeException(String.format("No replica strategy configured for %s", table));
+        Class [] parameterTypes = new Class[] { TokenMetadata.class, IEndPointSnitch.class};
         try
         {
             Constructor<AbstractReplicationStrategy> constructor = cls.getConstructor(parameterTypes);
-            replicationStrategy = constructor.newInstance(tokenMetadata, DatabaseDescriptor.getReplicationFactor());
+            replicationStrategy = constructor.newInstance(tokenMetadata, DatabaseDescriptor.getEndPointSnitch(table));
         }
         catch (Exception e)
         {
@@ -258,7 +265,7 @@
         Gossiper.instance.register(this);
         Gossiper.instance.start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
     }
-    
+
     public void initServer() throws IOException
     {
         isClientMode = false;
@@ -327,7 +334,7 @@
         {
             throw new AssertionError(e);
         }
-        new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles token update
+        new BootStrapper(FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles token update
     }
 
     public boolean isBootstrapMode()
@@ -339,12 +346,7 @@
     {
         return tokenMetadata_;
     }
-
-    public IEndPointSnitch getEndPointSnitch()
-    {
-        return endPointSnitch_;
-    }
-
+    
     /**
      * This method performs the requisite operations to make
      * sure that the N replicas are in sync. We do this in the
@@ -355,12 +357,28 @@
         consistencyManager_.submit(new ConsistencyManager(command.table, row, endpoints, command));
     }
 
-    public Map<Range, List<String>> getRangeToEndPointMap()
+    /**
+     * for a keyspace, return the ranges and corresponding hosts for a given keyspace.
+     * @param keyspace
+     * @return
+     */
+    public Map<Range, List<String>> getRangeToEndPointMap(String keyspace)
     {
+        // some people just want to get a visual representation of things. Allow null and set it to the first
+        // non-system table.
+        if (keyspace == null)
+        {
+            for (String ks : DatabaseDescriptor.getNonSystemTables())
+            {
+                keyspace = ks;
+                break;
+            }
+        }
+
         /* All the ranges for the tokens */
         List<Range> ranges = getAllRanges(tokenMetadata_.sortedTokens());
         Map<Range, List<String>> map = new HashMap<Range, List<String>>();
-        for (Map.Entry<Range,List<InetAddress>> entry : constructRangeToEndPointMap(ranges).entrySet())
+        for (Map.Entry<Range,List<InetAddress>> entry : constructRangeToEndPointMap(keyspace, ranges, keyspace).entrySet())
         {
             map.put(entry.getKey(), stringify(entry.getValue()));
         }
@@ -368,17 +386,17 @@
     }
 
     /**
-     * Construct the range to endpoint mapping based on the true view 
-     * of the world. 
+     * Construct the range to endpoint mapping based on the true view
+     * of the world.
      * @param ranges
      * @return mapping of ranges to the replicas responsible for them.
     */
-    public Map<Range, List<InetAddress>> constructRangeToEndPointMap(List<Range> ranges)
+    private Map<Range, List<InetAddress>> constructRangeToEndPointMap(String keyspace, List<Range> ranges, String table)
     {
         Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range, List<InetAddress>>();
         for (Range range : ranges)
         {
-            rangeToEndPointMap.put(range, replicationStrategy_.getNaturalEndpoints(range.right));
+            rangeToEndPointMap.put(range, getReplicationStrategy(keyspace).getNaturalEndpoints(range.right, table));
         }
         return rangeToEndPointMap;
     }
@@ -593,12 +611,14 @@
      */
     private void calculatePendingRanges()
     {
-        calculatePendingRanges(tokenMetadata_, replicationStrategy_);
+        for (String table : DatabaseDescriptor.getNonSystemTables())
+            calculatePendingRanges(getReplicationStrategy(table), table);
     }
 
     // public & static for testing purposes
-    public static void calculatePendingRanges(TokenMetadata tm, AbstractReplicationStrategy strategy)
+    public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String table)
     {
+        TokenMetadata tm = StorageService.instance.getTokenMetadata();
         Multimap<Range, InetAddress> pendingRanges = HashMultimap.create();
         Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
         Set<InetAddress> leavingEndPoints = tm.getLeavingEndPoints();
@@ -606,12 +626,12 @@
         if (bootstrapTokens.isEmpty() && leavingEndPoints.isEmpty())
         {
             if (logger_.isDebugEnabled())
-                logger_.debug("No bootstrapping or leaving nodes -> empty pending ranges");
-            tm.setPendingRanges(pendingRanges);
+                logger_.debug("No bootstrapping or leaving nodes -> empty pending ranges for " + table);
+            tm.setPendingRanges(table, pendingRanges);
             return;
         }
 
-        Multimap<InetAddress, Range> addressRanges = strategy.getAddressRanges();
+        Multimap<InetAddress, Range> addressRanges = strategy.getAddressRanges(table);
 
         // Copy of metadata reflecting the situation after all leave operations are finished.
         TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
@@ -625,8 +645,8 @@
         // all leaving nodes are gone.
         for (Range range : affectedRanges)
         {
-            List<InetAddress> currentEndPoints = strategy.getNaturalEndpoints(range.right, tm);
-            List<InetAddress> newEndPoints = strategy.getNaturalEndpoints(range.right, allLeftMetadata);
+            List<InetAddress> currentEndPoints = strategy.getNaturalEndpoints(range.right, tm, table);
+            List<InetAddress> newEndPoints = strategy.getNaturalEndpoints(range.right, allLeftMetadata, table);
             newEndPoints.removeAll(currentEndPoints);
             pendingRanges.putAll(range, newEndPoints);
         }
@@ -641,19 +661,19 @@
             InetAddress endPoint = entry.getValue();
 
             allLeftMetadata.updateNormalToken(entry.getKey(), endPoint);
-            for (Range range : strategy.getAddressRanges(allLeftMetadata).get(endPoint))
+            for (Range range : strategy.getAddressRanges(allLeftMetadata, table).get(endPoint))
                 pendingRanges.put(range, endPoint);
             allLeftMetadata.removeEndpoint(endPoint);
         }
 
-        tm.setPendingRanges(pendingRanges);
+        tm.setPendingRanges(table, pendingRanges);
 
         if (logger_.isDebugEnabled())
             logger_.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges()));
     }
 
     /**
-     * Called when endPoint is removed from the ring without proper
+     * Called when an endPoint is removed from the ring without proper
      * STATE_LEAVING -> STATE_LEFT sequence. This function checks
      * whether this node becomes responsible for new ranges as a
      * consequence and streams data if needed.
@@ -667,57 +687,66 @@
     {
         InetAddress myAddress = FBUtilities.getLocalAddress();
 
-        // get all ranges that change ownership (that is, a node needs
-        // to take responsibility for new range)
-        Multimap<Range, InetAddress> changedRanges = getChangedRangesForLeaving(endPoint);
-
-        // check if any of these ranges are coming our way
-        Set<Range> myNewRanges = new HashSet<Range>();
-        for (Map.Entry<Range, InetAddress> entry : changedRanges.entries())
+        for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            if (entry.getValue().equals(myAddress))
-                myNewRanges.add(entry.getKey());
-        }
-
-        if (!myNewRanges.isEmpty())
-        {
-            if (logger_.isDebugEnabled())
-                logger_.debug(endPoint + " was removed, my added ranges: " + StringUtils.join(myNewRanges, ", "));
-
-            Multimap<Range, InetAddress> rangeAddresses = replicationStrategy_.getRangeAddresses(tokenMetadata_);
-            Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
-            IFailureDetector failureDetector = FailureDetector.instance;
+            // get all ranges that change ownership (that is, a node needs
+            // to take responsibility for new range)
+            Multimap<Range, InetAddress> changedRanges = getChangedRangesForLeaving(table, endPoint);
+
+            // check if any of these ranges are coming our way
+            Set<Range> myNewRanges = new HashSet<Range>();
+            for (Map.Entry<Range, InetAddress> entry : changedRanges.entries())
+            {
+                if (entry.getValue().equals(myAddress))
+                    myNewRanges.add(entry.getKey());
+            }
 
-            // find alive sources for our new ranges
-            for (Range myNewRange : myNewRanges)
+            if (!myNewRanges.isEmpty())
             {
-                List<InetAddress> sources = DatabaseDescriptor.getEndPointSnitch().getSortedListByProximity(myAddress, rangeAddresses.get(myNewRange));
+                if (logger_.isDebugEnabled())
+                    logger_.debug(endPoint + " was removed, my added ranges: " + StringUtils.join(myNewRanges, ", "));
 
-                assert (!sources.contains(myAddress));
+                Multimap<Range, InetAddress> rangeAddresses = getReplicationStrategy(table).getRangeAddresses(tokenMetadata_, table);
+                Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
+                IFailureDetector failureDetector = FailureDetector.instance;
 
-                for (InetAddress source : sources)
+                // find alive sources for our new ranges
+                for (Range myNewRange : myNewRanges)
                 {
-                    if (source.equals(endPoint))
-                        continue;
+                    List<InetAddress> sources = DatabaseDescriptor.getEndPointSnitch(table).getSortedListByProximity(myAddress, rangeAddresses.get(myNewRange));
 
-                    if (failureDetector.isAlive(source))
+                    assert (!sources.contains(myAddress));
+
+                    for (InetAddress source : sources)
                     {
-                        sourceRanges.put(source, myNewRange);
-                        break;
+                        if (source.equals(endPoint))
+                            continue;
+
+                        if (failureDetector.isAlive(source))
+                        {
+                            sourceRanges.put(source, myNewRange);
+                            break;
+                        }
                     }
                 }
-            }
 
-            // Finally we have a list of addresses and ranges to stream. Proceed to stream
-            for (Map.Entry<InetAddress, Collection<Range>> entry : sourceRanges.asMap().entrySet())
-                StreamIn.requestRanges(entry.getKey(), entry.getValue());
+                // Finally we have a list of addresses and ranges to
+                // stream. Proceed to stream
+                for (Map.Entry<InetAddress, Collection<Range>> entry : sourceRanges.asMap().entrySet())
+                {
+                    if (logger_.isDebugEnabled())
+                        logger_.debug("Requesting from " + entry.getKey() + " ranges " + StringUtils.join(entry.getValue(), ", "));
+                    StreamIn.requestRanges(entry.getKey(), table, entry.getValue());
+                }
+            }
         }
     }
 
-    private Multimap<Range, InetAddress> getChangedRangesForLeaving(InetAddress endpoint)
+    // needs to be modified to accept either a table or ARS.
+    private Multimap<Range, InetAddress> getChangedRangesForLeaving(String table, InetAddress endpoint)
     {
         // First get all ranges the leaving endpoint is responsible for
-        Collection<Range> ranges = getRangesForEndPoint(endpoint);
+        Collection<Range> ranges = getRangesForEndPoint(table, endpoint);
 
         if (logger_.isDebugEnabled())
             logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, ", ") + "]");
@@ -726,7 +755,7 @@
 
         // Find (for each range) all nodes that store replicas for these ranges as well
         for (Range range : ranges)
-            currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right, tokenMetadata_));
+            currentReplicaEndpoints.put(range, getReplicationStrategy(table).getNaturalEndpoints(range.right, tokenMetadata_, table));
 
         TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
 
@@ -744,7 +773,7 @@
         // range.
         for (Range range : ranges)
         {
-            ArrayList<InetAddress> newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right, temp);
+            ArrayList<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).getNaturalEndpoints(range.right, temp, table);
             newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
             if (logger_.isDebugEnabled())
                 if (newReplicaEndpoints.isEmpty())
@@ -872,14 +901,14 @@
 
     public void forceTableCleanup() throws IOException
     {
-        for (Table table : Table.all())
+        List<String> tables = DatabaseDescriptor.getNonSystemTables();
+        for (String tName : tables)
         {
-            if (table.name.equals(Table.SYSTEM_TABLE))
-                continue;
+            Table table = Table.open(tName);
             table.forceCleanup();
         }
     }
-    
+
     public void forceTableCompaction() throws IOException
     {
         for (Table table : Table.all())
@@ -888,7 +917,7 @@
 
     /**
      * Takes the snapshot for a given table.
-     * 
+     *
      * @param tableName the name of the table.
      * @param tag   the tag given to the snapshot (null is permissible)
      */
@@ -909,7 +938,7 @@
 
     /**
      * Takes a snapshot for every table.
-     * 
+     *
      * @param tag the tag given to the snapshot (null is permissible)
      */
     public void takeAllSnapshot(String tag) throws IOException
@@ -925,6 +954,7 @@
     {
         for (Table table : Table.all())
             table.clearSnapshot();
+
         if (logger_.isDebugEnabled())
             logger_.debug("Cleared out all snapshot directories");
     }
@@ -975,7 +1005,7 @@
     {
         // request that all relevant endpoints generate trees
         final MessagingService ms = MessagingService.instance;
-        final List<InetAddress> endpoints = getNaturalEndpoints(getLocalToken());
+        final List<InetAddress> endpoints = getNaturalEndpoints(tableName, getLocalToken());
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
         {
             Message request = TreeRequestVerbHandler.makeVerb(tableName, cfStore.getColumnFamilyName());
@@ -985,7 +1015,7 @@
     }
 
     /* End of MBean interface methods */
-    
+
     /**
      * This method returns the predecessor of the endpoint ep on the identifier
      * space.
@@ -1015,20 +1045,20 @@
     {
         return tokenMetadata_.getPrimaryRangeFor(tokenMetadata_.getToken(ep));
     }
-    
+
     /**
      * Get all ranges an endpoint is responsible for.
      * @param ep endpoint we are interested in.
      * @return ranges for the specified endpoint.
      */
-    Collection<Range> getRangesForEndPoint(InetAddress ep)
+    Collection<Range> getRangesForEndPoint(String table, InetAddress ep)
     {
-        return replicationStrategy_.getAddressRanges().get(ep);
+        return getReplicationStrategy(table).getAddressRanges(table).get(ep);
     }
-        
+
     /**
      * Get all ranges that span the ring given a set
-     * of tokens. All ranges are in sorted order of 
+     * of tokens. All ranges are in sorted order of
      * ranges.
      * @return ranges in sorted order
     */
@@ -1108,10 +1138,10 @@
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public List<InetAddress> getNaturalEndpoints(String key)
+    public List<InetAddress> getNaturalEndpoints(String table, String key)
     {
-        return getNaturalEndpoints(partitioner_.getToken(key));
-    }    
+        return getNaturalEndpoints(table, partitioner_.getToken(key));
+    }
 
     /**
      * This method returns the N endpoints that are responsible for storing the
@@ -1120,11 +1150,11 @@
      * @param token - token for which we need to find the endpoint return value -
      * the endpoint responsible for this token
      */
-    public List<InetAddress> getNaturalEndpoints(Token token)
+    public List<InetAddress> getNaturalEndpoints(String table, Token token)
     {
-        return replicationStrategy_.getNaturalEndpoints(token);
-    }    
-    
+        return getReplicationStrategy(table).getNaturalEndpoints(token, table);
+    }
+
     /**
      * This method attempts to return N endpoints that are responsible for storing the
      * specified key i.e for replication.
@@ -1132,15 +1162,15 @@
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public List<InetAddress> getLiveNaturalEndpoints(String key)
+    public List<InetAddress> getLiveNaturalEndpoints(String table, String key)
     {
-        return getLiveNaturalEndpoints(partitioner_.getToken(key));
+        return getLiveNaturalEndpoints(table, partitioner_.getToken(key));
     }
 
-    public List<InetAddress> getLiveNaturalEndpoints(Token token)
+    public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
     {
         List<InetAddress> liveEps = new ArrayList<InetAddress>();
-        List<InetAddress> endpoints = replicationStrategy_.getNaturalEndpoints(token);
+        List<InetAddress> endpoints = getReplicationStrategy(table).getNaturalEndpoints(token, table);
 
         for (InetAddress endpoint : endpoints)
         {
@@ -1158,18 +1188,18 @@
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public Map<InetAddress, InetAddress> getHintedEndpointMap(String key, List<InetAddress> naturalEndpoints)
+    public Map<InetAddress, InetAddress> getHintedEndpointMap(String table, String key, List<InetAddress> naturalEndpoints)
     {
-        return replicationStrategy_.getHintedEndpoints(partitioner_.getToken(key), naturalEndpoints);
+        return getReplicationStrategy(table).getHintedEndpoints(partitioner_.getToken(key), table, naturalEndpoints);
     }
 
     /**
      * This function finds the closest live endpoint that contains a given key.
      */
-    public InetAddress findSuitableEndPoint(String key) throws IOException, UnavailableException
+    public InetAddress findSuitableEndPoint(String table, String key) throws IOException, UnavailableException
     {
-        List<InetAddress> endpoints = getNaturalEndpoints(key);
-        endPointSnitch_.sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+        List<InetAddress> endpoints = getNaturalEndpoints(table, key);
+        DatabaseDescriptor.getEndPointSnitch(table).sortByProximity(FBUtilities.getLocalAddress(), endpoints);
         for (InetAddress endpoint : endpoints)
         {
             if (FailureDetector.instance.isAlive(endpoint))
@@ -1201,7 +1231,7 @@
      * There will be 1 more token than splits requested.  So for splits of 2, tokens T1 T2 T3 will be returned,
      * where (T1, T2] is the first range and (T2, T3] is the second.  The first token will always be the left
      * Token of this node's primary range, and the last will always be the Right token of that range.
-     */ 
+     */
     public List<String> getSplits(int splits)
     {
         assert splits > 1;
@@ -1260,8 +1290,11 @@
             throw new UnsupportedOperationException("local node is not a member of the token ring yet");
         if (tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2)
             throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
-        if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
-            throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+        for (String table : DatabaseDescriptor.getNonSystemTables())
+        {
+            if (tokenMetadata_.getPendingRanges(table, FBUtilities.getLocalAddress()).size() > 0)
+                throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+        }
 
         logger_.info("DECOMMISSIONING");
         startLeaving();
@@ -1302,44 +1335,56 @@
 
     private void unbootstrap(final Runnable onFinish)
     {
-        Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(FBUtilities.getLocalAddress());
-        if (logger_.isDebugEnabled())
-            logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
-
-        if (rangesMM.isEmpty())
+        final CountDownLatch latch = new CountDownLatch(DatabaseDescriptor.getNonSystemTables().size());
+        for (final String table : DatabaseDescriptor.getNonSystemTables())
         {
-            // nothing needs transfer, so leave immediately.  this can happen when replication factor == number of nodes.
-            leaveRing();
-            onFinish.run();
-            return;
-        }
+            Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(table, FBUtilities.getLocalAddress());
+            if (logger_.isDebugEnabled())
+                logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
+            if (rangesMM.isEmpty())
+            {
+                latch.countDown();
+                continue;
+            }
 
-        final Set<Map.Entry<Range, InetAddress>> pending = new HashSet<Map.Entry<Range, InetAddress>>(rangesMM.entries());
-        for (final Map.Entry<Range, InetAddress> entry : rangesMM.entries())
-        {
-            final Range range = entry.getKey();
-            final InetAddress newEndpoint = entry.getValue();
-            final Runnable callback = new Runnable()
+            final Set<Map.Entry<Range, InetAddress>> pending = Collections.synchronizedSet(new HashSet<Map.Entry<Range, InetAddress>>(rangesMM.entries()));
+            for (final Map.Entry<Range, InetAddress> entry : rangesMM.entries())
             {
-                public synchronized void run()
+                final Range range = entry.getKey();
+                final InetAddress newEndpoint = entry.getValue();
+                final Runnable callback = new Runnable()
                 {
-                    pending.remove(entry);
-                    if (pending.isEmpty())
+                    public void run()
                     {
-                        leaveRing();
-                        onFinish.run();
+                        pending.remove(entry);
+                        if (pending.isEmpty())
+                            latch.countDown();
                     }
-                }
-            };
-            StageManager.getStage(StageManager.STREAM_STAGE).execute(new Runnable()
-            {
-                public void run()
+                };
+                StageManager.getStage(StageManager.STREAM_STAGE).execute(new Runnable()
                 {
-                    // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
-                    StreamOut.transferRanges(newEndpoint, Arrays.asList(range), callback);
-                }
-            });
+                    public void run()
+                    {
+                        // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
+                        StreamOut.transferRanges(newEndpoint, table, Arrays.asList(range), callback);
+                    }
+                });
+            }
+        }
+
+        // wait for the transfer runnables to signal the latch.
+        logger_.debug("waiting for stream aks.");
+        try
+        {
+            latch.await();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
         }
+        logger_.debug("stream acks all received.");
+        leaveRing();
+        onFinish.run();
     }
 
     public void move(String newToken) throws InterruptedException
@@ -1359,8 +1404,11 @@
      */
     private void move(final Token token) throws InterruptedException
     {
-        if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
-            throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+        for (String table : DatabaseDescriptor.getTables())
+        {
+            if (tokenMetadata_.getPendingRanges(table, FBUtilities.getLocalAddress()).size() > 0)
+                throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+        }
 
         logger_.info("starting move. leaving token " + getLocalToken());
         startLeaving();
@@ -1409,19 +1457,14 @@
         // to add new AP state for this command, but that would again
         // increase the amount of data to be gossiped in the cluster -
         // not good. REMOVE_TOKEN|LEFT_NORMALLY is used to distinguish
-        // between removetoken command and normal state left, so it is
+        // between ``removetoken command and normal state left, so it is
         // not so bad.
         Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter + token.toString()));
     }
 
-    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level)
+    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
     {
-        return replicationStrategy_.getWriteResponseHandler(blockFor, consistency_level);
-    }
-
-    public AbstractReplicationStrategy getReplicationStrategy()
-    {
-        return replicationStrategy_;
+        return getReplicationStrategy(table).getWriteResponseHandler(blockFor, consistency_level, table);
     }
 
     public boolean isClientMode()
@@ -1430,11 +1473,11 @@
     }
 
     // Never ever do this at home. Used by tests.
-    AbstractReplicationStrategy setReplicationStrategyUnsafe(AbstractReplicationStrategy newStrategy)
+    Map<String, AbstractReplicationStrategy> setReplicationStrategyUnsafe(Map<String, AbstractReplicationStrategy> replacement)
     {
-        AbstractReplicationStrategy oldStrategy = replicationStrategy_;
-        replicationStrategy_ = newStrategy;
-        return oldStrategy;
+        Map<String, AbstractReplicationStrategy> old = replicationStrategies;
+        replicationStrategies = replacement;
+        return old;
     }
 
     // Never ever do this at home. Used by tests.
@@ -1445,4 +1488,11 @@
         return oldPartitioner;
     }
 
+    TokenMetadata setTokenMetadataUnsafe(TokenMetadata tmd)
+    {
+        TokenMetadata old = tokenMetadata_;
+        tokenMetadata_ = tmd;
+        return old;
+    }
+
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Thu Feb  4 15:21:31 2010
@@ -60,7 +60,7 @@
      *
      * @return mapping of ranges to end points
      */
-    public Map<Range, List<String>> getRangeToEndPointMap();
+    public Map<Range, List<String>> getRangeToEndPointMap(String keyspace);
 
     /** Human-readable load value */
     public String getLoadString();
@@ -82,7 +82,7 @@
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public List<InetAddress> getNaturalEndpoints(String key);
+    public List<InetAddress> getNaturalEndpoints(String key, String table);
 
     /**
      * Forces major compaction (all sstable files compacted)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Thu Feb  4 15:21:31 2010
@@ -41,11 +41,11 @@
     protected int localResponses;
     private final long startTime;
 
-    public WriteResponseHandler(int responseCount)
+    public WriteResponseHandler(int responseCount, String table)
     {
         // at most one node per range can bootstrap at a time, and these will be added to the write until
         // bootstrap finishes (at which point we no longer need to write to the old ones).
-        assert 1 <= responseCount && responseCount <= 2 * DatabaseDescriptor.getReplicationFactor()
+        assert 1 <= responseCount && responseCount <= 2 * DatabaseDescriptor.getReplicationFactor(table)
             : "invalid response count " + responseCount;
 
         this.responseCount = responseCount;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Thu Feb  4 15:21:31 2010
@@ -1,30 +1,30 @@
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.Collection;
-
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-
-/** for streaming data from other nodes in to this one */
-public class StreamIn
-{
-    private static Logger logger = Logger.getLogger(StreamOut.class);
-
-    /**
-     * Request ranges to be transferred from source to local node
-     */
-    public static void requestRanges(InetAddress source, Collection<Range> ranges)
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
-        StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
-        Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
-        MessagingService.instance.sendOneWay(message, source);
-    }
-}
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.Collection;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/** for streaming data from other nodes in to this one */
+public class StreamIn
+{
+    private static Logger logger = Logger.getLogger(StreamOut.class);
+
+    /**
+     * Request ranges to be transferred from source to local node
+     */
+    public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
+        StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges, tableName);
+        Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
+        MessagingService.instance.sendOneWay(message, source);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Thu Feb  4 15:21:31 2010
@@ -63,7 +63,7 @@
     /**
      * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
     */
-    public static void transferRanges(InetAddress target, Collection<Range> ranges, Runnable callback)
+    public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback)
     {
         assert ranges.size() > 0;
 
@@ -75,36 +75,34 @@
          * (2) anticompaction -- split out the keys in the range specified
          * (3) transfer the data.
         */
-        for (Table table : Table.all())
+        try
         {
-            try
+            Table table = Table.open(tableName);
+            if (logger.isDebugEnabled())
+                logger.debug("Flushing memtables ...");
+            for (Future f : table.flush())
             {
-                if (logger.isDebugEnabled())
-                  logger.debug("Flushing memtables ...");
-                for (Future f : table.flush())
+                try
                 {
-                    try
-                    {
-                        f.get();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new RuntimeException(e);
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw new RuntimeException(e);
-                    }
+                    f.get();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                catch (ExecutionException e)
+                {
+                    throw new RuntimeException(e);
                 }
-                if (logger.isDebugEnabled())
-                  logger.debug("Performing anticompaction ...");
-                /* Get the list of files that need to be streamed */
-                transferSSTables(target, table.forceAntiCompaction(ranges, target), table.name); // SSTR GC deletes the file when done
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
             }
+            if (logger.isDebugEnabled())
+                logger.debug("Performing anticompaction ...");
+            /* Get the list of files that need to be streamed */
+            transferSSTables(target, table.forceAntiCompaction(ranges, target), tableName); // SSTR GC deletes the file when done
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
         }
         if (callback != null)
             callback.run();
@@ -127,7 +125,7 @@
             }
         }
         if (logger.isDebugEnabled())
-          logger.debug("Stream context metadata " + StringUtils.join(pendingFiles, ", "));
+          logger.debug("Stream context metadata " + StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
 
         StreamOutManager.get(target).addFilesToStream(pendingFiles);
         StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java Thu Feb  4 15:21:31 2010
@@ -1,80 +1,86 @@
-package org.apache.cassandra.streaming;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-
-/**
- * This encapsulates information of the list of ranges that a target
- * node requires to be transferred. This will be bundled in a
- * StreamRequestsMessage and sent to nodes that are going to handoff
- * the data.
-*/
-class StreamRequestMetadata
-{
-    private static ICompactSerializer<StreamRequestMetadata> serializer_;
-    static
-    {
-        serializer_ = new StreamRequestMetadataSerializer();
-    }
-
-    protected static ICompactSerializer<StreamRequestMetadata> serializer()
-    {
-        return serializer_;
-    }
-
-    protected InetAddress target_;
-    protected Collection<Range> ranges_;
-
-    StreamRequestMetadata(InetAddress target, Collection<Range> ranges)
-    {
-        target_ = target;
-        ranges_ = ranges;
-    }
-
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("");
-        sb.append(target_);
-        sb.append("------->");
-        for ( Range range : ranges_ )
-        {
-            sb.append(range);
-            sb.append(" ");
-        }
-        return sb.toString();
-    }
-
-    private static class StreamRequestMetadataSerializer implements ICompactSerializer<StreamRequestMetadata>
-    {
-        public void serialize(StreamRequestMetadata srMetadata, DataOutputStream dos) throws IOException
-        {
-            CompactEndPointSerializationHelper.serialize(srMetadata.target_, dos);
-            dos.writeInt(srMetadata.ranges_.size());
-            for (Range range : srMetadata.ranges_)
-            {
-                Range.serializer().serialize(range, dos);
-            }
-        }
-
-        public StreamRequestMetadata deserialize(DataInputStream dis) throws IOException
-        {
-            InetAddress target = CompactEndPointSerializationHelper.deserialize(dis);
-            int size = dis.readInt();
-            List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
-            for( int i = 0; i < size; ++i )
-            {
-                ranges.add(Range.serializer().deserialize(dis));
-            }
-            return new StreamRequestMetadata( target, ranges );
-        }
-    }
-}
+package org.apache.cassandra.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+
+/**
+ * This encapsulates information of the list of ranges that a target
+ * node requires to be transferred. This will be bundled in a
+ * StreamRequestsMessage and sent to nodes that are going to handoff
+ * the data.
+*/
+class StreamRequestMetadata
+{
+    private static ICompactSerializer<StreamRequestMetadata> serializer_;
+    static
+    {
+        serializer_ = new StreamRequestMetadataSerializer();
+    }
+
+    protected static ICompactSerializer<StreamRequestMetadata> serializer()
+    {
+        return serializer_;
+    }
+
+    protected InetAddress target_;
+    protected Collection<Range> ranges_;
+    protected String table_;
+
+    StreamRequestMetadata(InetAddress target, Collection<Range> ranges, String table)
+    {
+        target_ = target;
+        ranges_ = ranges;
+        table_ = table;
+    }
+
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");
+        sb.append(table_);
+        sb.append("@");
+        sb.append(target_);
+        sb.append("------->");
+        for ( Range range : ranges_ )
+        {
+            sb.append(range);
+            sb.append(" ");
+        }
+        return sb.toString();
+    }
+}
+
+class StreamRequestMetadataSerializer implements ICompactSerializer<StreamRequestMetadata>
+{
+    public void serialize(StreamRequestMetadata srMetadata, DataOutputStream dos) throws IOException
+    {
+        CompactEndPointSerializationHelper.serialize(srMetadata.target_, dos);
+        dos.writeUTF(srMetadata.table_);
+        dos.writeInt(srMetadata.ranges_.size());
+        for (Range range : srMetadata.ranges_)
+        {
+            Range.serializer().serialize(range, dos);
+        }
+    }
+
+    public StreamRequestMetadata deserialize(DataInputStream dis) throws IOException
+    {
+        InetAddress target = CompactEndPointSerializationHelper.deserialize(dis);
+        String table = dis.readUTF();
+        int size = dis.readInt();
+        List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
+        for( int i = 0; i < size; ++i )
+        {
+            ranges.add(Range.serializer().deserialize(dis));
+        }
+        return new StreamRequestMetadata(target, ranges, table);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Thu Feb  4 15:21:31 2010
@@ -52,7 +52,7 @@
             {
                 if (logger_.isDebugEnabled())
                     logger_.debug(srm.toString());
-                StreamOut.transferRanges(srm.target_, srm.ranges_, null);
+                StreamOut.transferRanges(srm.target_, srm.table_, srm.ranges_, null);
             }
         }
         catch (IOException ex)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java Thu Feb  4 15:21:31 2010
@@ -144,9 +144,9 @@
         hf.printHelp(usage, "", options, header);
     }
     
-    public void printEndPoints(String key)
+    public void printEndPoints(String key, String table)
     {
-        List<InetAddress> endpoints = probe.getEndPoints(key);
+        List<InetAddress> endpoints = probe.getEndPoints(key, table);
         System.out.println(String.format("%-17s: %s", "Key", key));
         System.out.println(String.format("%-17s: %s", "Endpoints", endpoints));
     }
@@ -254,21 +254,21 @@
         String cmdName = arguments[0];
         if (cmdName.equals("get_endpoints"))
         {
-            if (arguments.length <= 1)
+            if (arguments.length <= 2)
             {
-                System.err.println("missing key argument");
+                System.err.println("missing key and/or table argument");
             }
-            clusterCmd.printEndPoints(arguments[1]);
+            clusterCmd.printEndPoints(arguments[1], arguments[2]);
         }
-	else if (cmdName.equals("global_snapshot"))
-	{
+        else if (cmdName.equals("global_snapshot"))
+        {
             String snapshotName = "";
             if (arguments.length > 1)
             {
                 snapshotName = arguments[1];
             }
-	    clusterCmd.takeGlobalSnapshot(snapshotName);
-	}
+            clusterCmd.takeGlobalSnapshot(snapshotName);
+        }
         else if (cmdName.equals("clear_global_snapshot"))
         {
             clusterCmd.clearGlobalSnapshot();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Feb  4 15:21:31 2010
@@ -66,7 +66,7 @@
      */
     public void printRing(PrintStream outs)
     {
-        Map<Range, List<String>> rangeMap = probe.getRangeToEndPointMap();
+        Map<Range, List<String>> rangeMap = probe.getRangeToEndPointMap(null);
         List<Range> ranges = new ArrayList<Range>(rangeMap.keySet());
         Collections.sort(ranges);
         Set<String> liveNodes = probe.getLiveNodes();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Thu Feb  4 15:21:31 2010
@@ -26,6 +26,8 @@
 import java.lang.management.RuntimeMXBean;
 import java.net.InetAddress;
 import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -139,15 +141,84 @@
         ssProxy.forceTableRepair(tableName, columnFamilies);
     }
     
-    public Map<Range, List<String>> getRangeToEndPointMap()
+    public Map<Range, List<String>> getRangeToEndPointMap(String tableName)
     {
-        return ssProxy.getRangeToEndPointMap();
+        return ssProxy.getRangeToEndPointMap(tableName);
     }
     
     public Set<String> getLiveNodes()
     {
         return ssProxy.getLiveNodes();
     }
+
+    /**
+     * Write a textual representation of the Cassandra ring.
+     * 
+     * @param outs the stream to write to
+     */
+    public void printRing(PrintStream outs)
+    {
+        Map<Range, List<String>> rangeMap = ssProxy.getRangeToEndPointMap(null);
+        List<Range> ranges = new ArrayList<Range>(rangeMap.keySet());
+        Collections.sort(ranges);
+        Set<String> liveNodes = ssProxy.getLiveNodes();
+        Set<String> deadNodes = ssProxy.getUnreachableNodes();
+        Map<String, String> loadMap = ssProxy.getLoadMap();
+
+        // Print range-to-endpoint mapping
+        int counter = 0;
+        outs.print(String.format("%-14s", "Address"));
+        outs.print(String.format("%-11s", "Status"));
+        outs.print(String.format("%-14s", "Load"));
+        outs.print(String.format("%-43s", "Range"));
+        outs.println("Ring");
+        // emphasize that we're showing the right part of each range
+        if (ranges.size() > 1)
+        {
+            outs.println(String.format("%-14s%-11s%-14s%-43s", "", "", "", ranges.get(0).left));
+        }
+        // normal range & node info
+        for (Range range : ranges) {
+            List<String> endpoints = rangeMap.get(range);
+            String primaryEndpoint = endpoints.get(0);
+
+            outs.print(String.format("%-14s", primaryEndpoint));
+
+            String status = liveNodes.contains(primaryEndpoint)
+                          ? "Up"
+                          : deadNodes.contains(primaryEndpoint)
+                            ? "Down"
+                            : "?";
+            outs.print(String.format("%-11s", status));
+
+            String load = loadMap.containsKey(primaryEndpoint) ? loadMap.get(primaryEndpoint) : "?";
+            outs.print(String.format("%-14s", load));
+
+            outs.print(String.format("%-43s", range.right));
+
+            String asciiRingArt;
+            if (counter == 0)
+            {
+                asciiRingArt = "|<--|";
+            }
+            else if (counter == (rangeMap.size() - 1))
+            {
+                asciiRingArt = "|-->|";
+            }
+            else
+            {
+                if ((rangeMap.size() > 4) && ((counter % 2) == 0))
+                    asciiRingArt = "v   |";
+                else if ((rangeMap.size() > 4) && ((counter % 2) != 0))
+                    asciiRingArt = "|   ^";
+                else
+                    asciiRingArt = "|   |";
+            }
+            outs.println(asciiRingArt);
+            
+            counter++;
+        }
+    }
     
     public Set<String> getUnreachableNodes()
     {
@@ -325,9 +396,9 @@
         }
     }
 
-    public List<InetAddress> getEndPoints(String key)
+    public List<InetAddress> getEndPoints(String key, String table)
     {
-        return ssProxy.getNaturalEndpoints(key);
+        return ssProxy.getNaturalEndpoints(key, table);
     }
 }
 

Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Thu Feb  4 15:21:31 2010
@@ -23,9 +23,6 @@
    <CommitLogSync>batch</CommitLogSync>
    <CommitLogSyncBatchWindowInMS>1.0</CommitLogSyncBatchWindowInMS>
    <Partitioner>org.apache.cassandra.dht.CollatingOrderPreservingPartitioner</Partitioner>
-   <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
-   <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
-   <ReplicationFactor>1</ReplicationFactor>
    <RpcTimeoutInMillis>5000</RpcTimeoutInMillis>
    <ListenAddress>127.0.0.1</ListenAddress>
    <StoragePort>7010</StoragePort>
@@ -51,12 +48,33 @@
        <ColumnFamily ColumnType="Super" CompareSubcolumnsWith="LongType" Name="Super2"/>
        <ColumnFamily ColumnType="Super" CompareSubcolumnsWith="LongType" Name="Super3"/>
        <ColumnFamily ColumnType="Super" CompareSubcolumnsWith="UTF8Type" Name="Super4"/>
+       <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+       <ReplicationFactor>1</ReplicationFactor>
+       <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
      </Keyspace>
      <Keyspace Name = "Keyspace2">
        <ColumnFamily Name="Standard1"/>
        <ColumnFamily Name="Standard3"/>
        <ColumnFamily ColumnType="Super" Name="Super3"/>
        <ColumnFamily ColumnType="Super" CompareSubcolumnsWith="TimeUUIDType" Name="Super4"/>
+       <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+       <ReplicationFactor>1</ReplicationFactor>
+       <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+     </Keyspace>
+     <Keyspace Name = "Keyspace3">
+       <ColumnFamily Name="Standard1"/>
+       <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+       <ReplicationFactor>5</ReplicationFactor>
+       <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+     </Keyspace>
+     <Keyspace Name = "Keyspace4">
+       <ColumnFamily Name="Standard1"/>
+       <ColumnFamily Name="Standard3"/>
+       <ColumnFamily ColumnType="Super" Name="Super3"/>
+       <ColumnFamily ColumnType="Super" CompareSubcolumnsWith="TimeUUIDType" Name="Super4"/>
+       <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+       <ReplicationFactor>3</ReplicationFactor>
+       <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
      </Keyspace>
    </Keyspaces>
    <Seeds>

Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Thu Feb  4 15:21:31 2010
@@ -873,7 +873,7 @@
     def test_describe_keyspace(self):
         """ Test keyspace description """
         kspaces = client.get_string_list_property("keyspaces")
-        assert len(kspaces) == 3, kspaces
+        assert len(kspaces) == 5, kspaces # ['Keyspace1', 'Keyspace2', 'Keyspace3', 'Keyspace4', 'system']
         ks1 = client.describe_keyspace("Keyspace1")
         assert set(ks1.keys()) == set(['Super1', 'Standard1', 'Standard2', 'StandardLong1', 'StandardLong2', 'Super3', 'Super2', 'Super4'])
         sysks = client.describe_keyspace("system")

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java Thu Feb  4 15:21:31 2010
@@ -59,30 +59,51 @@
     }
 
     /**
-     * usage: java -Dstorage-config="confpath" org.apache.cassandra.client.TestRingCache
+     * usage: java -Dstorage-config="confpath" org.apache.cassandra.client.TestRingCache [keyspace row-id-prefix row-id-int]
+     * to test a single keyspace/row, use the parameters. row-id-prefix and row-id-int are appended together to form a
+     * single row id.  If you supply now parameters, 'Keyspace1' is assumed and will check 9 rows ('row1' through 'row9').
      * @param args
      * @throws Exception
      */
     public static void main(String[] args) throws Throwable
     {
-        String table = "Keyspace1";
-        for (int nRows=1; nRows<10; nRows++)
+        String table;
+        int minRow;
+        int maxRow;
+        String rowPrefix;
+        if (args.length > 0)
         {
-            String row = "row" + nRows;
+            table = args[0];
+            rowPrefix = args[1];
+            minRow = Integer.parseInt(args[2]);
+            maxRow = minRow + 1;
+        }
+        else
+        {
+            table = "Keyspace1";
+            minRow = 1;
+            maxRow = 10;
+            rowPrefix = "row";
+        }
+
+        for (int nRows = minRow; nRows < maxRow; nRows++)
+        {
+            String row = rowPrefix + nRows;
             ColumnPath col = createColumnPath("Standard1", null, "col1".getBytes());
 
-            List<InetAddress> endPoints = ringCache.getEndPoint(row);
+            List<InetAddress> endPoints = ringCache.getEndPoint(table, row);
             String hosts="";
             for (int i = 0; i < endPoints.size(); i++)
                 hosts = hosts + ((i > 0) ? "," : "") + endPoints.get(i);
             System.out.println("hosts with key " + row + " : " + hosts + "; choose " + endPoints.get(0));
-        
+
             // now, read the row back directly from the host owning the row locally
             setup(endPoints.get(0).getHostAddress(), DatabaseDescriptor.getThriftPort());
             thriftClient.insert(table, row, col, "val1".getBytes(), 1, ConsistencyLevel.ONE);
             Column column=thriftClient.get(table, row, col, ConsistencyLevel.ONE).column;
             System.out.println("read row " + row + " " + new String(column.name) + ":" + new String(column.value) + ":" + column.timestamp);
         }
+
         System.exit(1);
     }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java Thu Feb  4 15:21:31 2010
@@ -34,8 +34,8 @@
      * TODO: A more general method of property modification would be useful, but
      *       will probably have to wait for a refactor away from all the static fields.
      */
-    public static void setReplicationFactor(int factor)
+    public static void setReplicationFactor(String table, int factor)
     {
-        DatabaseDescriptor.setReplicationFactorUnsafe(factor);
+        DatabaseDescriptor.setReplicationFactorUnsafe(table, factor);
     }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Thu Feb  4 15:21:31 2010
@@ -25,6 +25,7 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.commons.lang.StringUtils;
 import static org.junit.Assert.assertEquals;
 import org.junit.Test;
@@ -71,12 +72,17 @@
     @Test
     public void testSourceTargetComputation() throws UnknownHostException
     {
-        testSourceTargetComputation(1);
-        testSourceTargetComputation(3);
-        testSourceTargetComputation(100);
+        final int[] clusterSizes = new int[] { 1, 3, 5, 10, 100};
+        for (String table : DatabaseDescriptor.getNonSystemTables())
+        {
+            int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
+            for (int clusterSize : clusterSizes)
+                if (clusterSize >= replicationFactor)
+                    testSourceTargetComputation(table, clusterSize, replicationFactor);
+        }
     }
 
-    private void testSourceTargetComputation(int numOldNodes) throws UnknownHostException
+    private void testSourceTargetComputation(String table, int numOldNodes, int replicationFactor) throws UnknownHostException
     {
         StorageService ss = StorageService.instance;
 
@@ -86,8 +92,8 @@
 
         TokenMetadata tmd = ss.getTokenMetadata();
         assertEquals(numOldNodes, tmd.sortedTokens().size());
-        BootStrapper b = new BootStrapper(ss.getReplicationStrategy(), myEndpoint, myToken, tmd);
-        Multimap<Range, InetAddress> res = b.getRangesWithSources();
+        BootStrapper b = new BootStrapper(myEndpoint, myToken, tmd);
+        Multimap<Range, InetAddress> res = b.getRangesWithSources(table);
         
         int transferCount = 0;
         for (Map.Entry<Range, Collection<InetAddress>> e : res.asMap().entrySet())
@@ -96,8 +102,7 @@
             transferCount++;
         }
 
-        /* Only 1 transfer from old node to new node */
-        assertEquals(1, transferCount);
+        assertEquals(replicationFactor, transferCount);
         IFailureDetector mockFailureDetector = new IFailureDetector()
         {
             public boolean isAlive(InetAddress ep)
@@ -112,8 +117,10 @@
             public void remove(InetAddress ep) { throw new UnsupportedOperationException(); }
         };
         Multimap<InetAddress, Range> temp = BootStrapper.getWorkMap(res, mockFailureDetector);
-        assertEquals(1, temp.keySet().size());
-        assertEquals(1, temp.asMap().values().iterator().next().size());
+        // there isn't any point in testing the size of these collections for any specific size.  When a random partitioner
+        // is used, they will vary.
+        assert temp.keySet().size() > 0;
+        assert temp.asMap().values().iterator().next().size() > 0;
         assert !temp.keySet().iterator().next().equals(myEndpoint);
     }
 

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Thu Feb  4 15:21:31 2010
@@ -25,6 +25,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.service.StorageServiceAccessor;
 import org.junit.Test;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
@@ -40,10 +43,26 @@
 public class RackUnawareStrategyTest
 {
     @Test
+    public void tryBogusTable()
+    {
+        AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy("Keyspace1");
+        assertNotNull(rs);
+        try
+        {
+            rs = StorageService.instance.getReplicationStrategy("SomeBogusTableThatDoesntExist");
+            throw new AssertionError("SS.getReplicationStrategy() should have thrown a RuntimeException.");
+        }
+        catch (RuntimeException ex)
+        {
+            // This exception should be thrown.
+        }
+    }
+
+    @Test
     public void testBigIntegerEndpoints() throws UnknownHostException
     {
         TokenMetadata tmd = new TokenMetadata();
-        AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, 3);
+        AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, null);
 
         List<Token> endPointTokens = new ArrayList<Token>();
         List<Token> keyTokens = new ArrayList<Token>();
@@ -51,7 +70,8 @@
             endPointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
             keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
         }
-        testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]));
+        for (String table : DatabaseDescriptor.getNonSystemTables())
+            testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table);
     }
 
     @Test
@@ -59,7 +79,7 @@
     {
         TokenMetadata tmd = new TokenMetadata();
         IPartitioner partitioner = new OrderPreservingPartitioner();
-        AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, 3);
+        AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, null);
 
         List<Token> endPointTokens = new ArrayList<Token>();
         List<Token> keyTokens = new ArrayList<Token>();
@@ -67,12 +87,13 @@
             endPointTokens.add(new StringToken(String.valueOf((char)('a' + i * 2))));
             keyTokens.add(partitioner.getToken(String.valueOf((char)('a' + i * 2 + 1))));
         }
-        testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]));
+        for (String table : DatabaseDescriptor.getNonSystemTables())
+            testGetEndpoints(tmd, strategy, endPointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table);
     }
 
     // given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens,
     // make sure that the Strategy picks the right endpoints for the keys.
-    private void testGetEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens) throws UnknownHostException
+    private void testGetEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens, String table) throws UnknownHostException
     {
         List<InetAddress> hosts = new ArrayList<InetAddress>();
         for (int i = 0; i < endPointTokens.length; i++)
@@ -84,8 +105,8 @@
 
         for (int i = 0; i < keyTokens.length; i++)
         {
-            List<InetAddress> endPoints = strategy.getNaturalEndpoints(keyTokens[i]);
-            assertEquals(3, endPoints.size());
+            List<InetAddress> endPoints = strategy.getNaturalEndpoints(keyTokens[i], table);
+            assertEquals(DatabaseDescriptor.getReplicationFactor(table), endPoints.size());
             for (int j = 0; j < endPoints.size(); j++)
             {
                 assertEquals(endPoints.get(j), hosts.get((i + j + 1) % hosts.size()));
@@ -96,16 +117,19 @@
     @Test
     public void testGetEndpointsDuringBootstrap() throws UnknownHostException
     {
+        // the token difference will be RING_SIZE * 2.
+        final int RING_SIZE = 10;
         TokenMetadata tmd = new TokenMetadata();
-        AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, 3);
+        TokenMetadata oldTmd = StorageServiceAccessor.setTokenMetadata(tmd);
+        AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, null);
 
-        Token[] endPointTokens = new Token[5]; 
-        Token[] keyTokens = new Token[5];
+        Token[] endPointTokens = new Token[RING_SIZE];
+        Token[] keyTokens = new Token[RING_SIZE];
         
-        for (int i = 0; i < 5; i++) 
+        for (int i = 0; i < RING_SIZE; i++)
         {
-            endPointTokens[i] = new BigIntegerToken(String.valueOf(10 * i));
-            keyTokens[i] = new BigIntegerToken(String.valueOf(10 * i + 5));
+            endPointTokens[i] = new BigIntegerToken(String.valueOf(RING_SIZE * 2 * i));
+            keyTokens[i] = new BigIntegerToken(String.valueOf(RING_SIZE * 2 * i + RING_SIZE));
         }
         
         List<InetAddress> hosts = new ArrayList<InetAddress>();
@@ -115,28 +139,36 @@
             tmd.updateNormalToken(endPointTokens[i], ep);
             hosts.add(ep);
         }
-        
-        //Add bootstrap node id=6
-        Token bsToken = new BigIntegerToken(String.valueOf(25));
-        InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
+
+        // bootstrap at the end of the ring
+        Token bsToken = new BigIntegerToken(String.valueOf(210));
+        InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.11");
         tmd.addBootstrapToken(bsToken, bootstrapEndPoint);
-        StorageService.calculatePendingRanges(tmd, strategy);
 
-        for (int i = 0; i < keyTokens.length; i++)
+        for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            Collection<InetAddress> endPoints = strategy.getWriteEndpoints(keyTokens[i], strategy.getNaturalEndpoints(keyTokens[i]));
-            assertTrue(endPoints.size() >= 3);
+            StorageService.calculatePendingRanges(strategy, table);
+            int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
 
-            for (int j = 0; j < 3; j++)
+            for (int i = 0; i < keyTokens.length; i++)
             {
-                //Check that the old nodes are definitely included
-                assertTrue(endPoints.contains(hosts.get((i + j + 1) % hosts.size())));
+                Collection<InetAddress> endPoints = strategy.getWriteEndpoints(keyTokens[i], table, strategy.getNaturalEndpoints(keyTokens[i], table));
+                assertTrue(endPoints.size() >= replicationFactor);
+
+                for (int j = 0; j < replicationFactor; j++)
+                {
+                    //Check that the old nodes are definitely included
+                    assertTrue(endPoints.contains(hosts.get((i + j + 1) % hosts.size())));
+                }
+
+                // bootstrapEndPoint should be in the endPoints for i in MAX-RF to MAX, but not in any earlier ep.
+                if (i < RING_SIZE - replicationFactor)
+                    assertFalse(endPoints.contains(bootstrapEndPoint));
+                else
+                    assertTrue(endPoints.contains(bootstrapEndPoint));
             }
-            // for 5, 15, 25 this should include bootstrap node
-            if (i < 3)
-                assertTrue(endPoints.contains(bootstrapEndPoint));
-            else
-                assertFalse(endPoints.contains(bootstrapEndPoint));
         }
+
+        StorageServiceAccessor.setTokenMetadata(oldTmd);
     }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Thu Feb  4 15:21:31 2010
@@ -61,8 +61,9 @@
         if (!initialized)
         {
             LOCAL = FBUtilities.getLocalAddress();
+            tablename = DatabaseDescriptor.getTables().iterator().next();
             // bump the replication factor so that local overlaps with REMOTE below
-            DatabaseDescriptorTest.setReplicationFactor(2);
+            DatabaseDescriptorTest.setReplicationFactor(tablename, 2);
 
             StorageService.instance.initServer();
             // generate a fake endpoint for which we can spoof receiving/sending trees
@@ -72,7 +73,6 @@
             tmd.updateNormalToken(part.getMinimumToken(), REMOTE);
             assert tmd.isMember(REMOTE);
 
-            tablename = DatabaseDescriptor.getTables().iterator().next();
             cfname = Table.open(tablename).getColumnFamilies().iterator().next();
             initialized = true;
         }
@@ -89,7 +89,7 @@
     @Test
     public void testGetValidator() throws Throwable
     {
-        aes.clearNaturalRepairs();
+        aes.clearNaturalRepairs_TestsOnly();
 
         // not major
         assert aes.getValidator(tablename, cfname, null, false) instanceof NoopValidator;
@@ -174,13 +174,13 @@
         Util.writeColumnFamily(rms);
         ColumnFamilyStore store = Util.writeColumnFamily(rms);
         
-        TreePair old = aes.getRendezvousPair(tablename, cfname, REMOTE);
+        TreePair old = aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE);
         // force a readonly compaction, and wait for it to finish
         CompactionManager.instance.submitReadonly(store, REMOTE).get(5000, TimeUnit.MILLISECONDS);
 
         // check that a tree was created and stored
         flushAES().get(5000, TimeUnit.MILLISECONDS);
-        assert old != aes.getRendezvousPair(tablename, cfname, REMOTE);
+        assert old != aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE);
     }
 
     @Test
@@ -200,7 +200,7 @@
         
         // confirm that our reference is not equal to the original due
         // to (de)serialization
-        assert tree != aes.getRendezvousPair(tablename, cfname, REMOTE).left;
+        assert tree != aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE).left;
     }
 
     @Test