You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/14 23:53:10 UTC

svn commit: r954657 - in /cassandra/trunk: src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/conf/ test/unit/org/apache/cassandra/locator/

Author: jbellis
Date: Mon Jun 14 21:53:09 2010
New Revision: 954657

URL: http://svn.apache.org/viewvc?rev=954657&view=rev
Log:
Fix bootstrap with DSS and add endpoint caching.
patch by mdennis and jbellis for CASSANDRA-1147

Added:
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/test/conf/cassandra-rack.properties
    cassandra/trunk/test/conf/datacenters.properties
    cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java

Added: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=954657&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Mon Jun 14 21:53:09 2010
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public abstract class AbstractEndpointSnitch implements IEndpointSnitch
+{
+    /* list of subscribers that are notified when cached values from this snitch are invalidated */
+    protected List<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
+
+    public void register(AbstractReplicationStrategy subscriber)
+    {
+        subscribers.add(subscriber);
+    }
+
+    protected void invalidateCachedSnitchValues()
+    {
+        for (AbstractReplicationStrategy subscriber : subscribers)
+            subscriber.invalidateCachedSnitchValues();
+    }
+
+    public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
+    public abstract List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java Mon Jun 14 21:53:09 2010
@@ -1,4 +1,3 @@
-package org.apache.cassandra.locator;
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.cassandra.locator;
  *
  */
 
+package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -29,7 +29,7 @@ import java.util.*;
  * An endpoint snitch tells Cassandra information about network topology that it can use to route
  * requests more efficiently.
  */
-public abstract class AbstractRackAwareSnitch implements IEndpointSnitch
+public abstract class AbstractRackAwareSnitch extends AbstractEndpointSnitch
 {
     /**
      * Return the rack for which an endpoint resides in

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Mon Jun 14 21:53:09 2010
@@ -16,57 +16,79 @@
 * specific language governing permissions and limitations
 * under the License.
 */
+
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
 import java.util.*;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.AbstractWriteResponseHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
 import org.apache.cassandra.service.IResponseResolver;
 import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
  * A abstract parent for all replication strategies.
 */
 public abstract class AbstractReplicationStrategy
 {
-    protected static final Logger logger_ = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
+    private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
 
-    protected TokenMetadata tokenMetadata_;
-    protected final IEndpointSnitch snitch_;
+    private TokenMetadata tokenMetadata;
+    protected final IEndpointSnitch snitch;
+    private final Map<EndpointCacheKey, ArrayList<InetAddress>> cachedEndpoints;
 
     AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch)
     {
-        tokenMetadata_ = tokenMetadata;
-        snitch_ = snitch;
+        this.tokenMetadata = tokenMetadata;
+        this.snitch = snitch;
+        cachedEndpoints = new NonBlockingHashMap<EndpointCacheKey, ArrayList<InetAddress>>();
+        this.tokenMetadata.register(this);
+        if (this.snitch != null)
+            this.snitch.register(this);
     }
 
     /**
-     * get the endpoints that should store the given Token, for the given table.
+     * get the (possibly cached) endpoints that should store the given Token, for the given table.
      * Note that while the endpoints are conceptually a Set (no duplicates will be included),
-     * we return a List to avoid an extra allocation when sorting by proximity later.
+     * we return a List to avoid an extra allocation when sorting by proximity later
+     * @param searchToken the token the natural endpoints are requested for
+     * @param table the table the natural endpoints are requested for
+     * @return a copy of the natural endpoints for the given token and table
      */
-    public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table);
-
-    public ArrayList<InetAddress> getNaturalEndpoints(Token token, String table)
+    public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table)
     {
-        return getNaturalEndpoints(token, tokenMetadata_, table);
+        // TODO creating a iterator object just to get the closest token is wasteful -- we do in multiple places w/ ringIterator
+        Token keyToken = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken).next();
+        EndpointCacheKey cacheKey = new EndpointCacheKey(table, keyToken);
+        ArrayList<InetAddress> endpoints = cachedEndpoints.get(cacheKey);
+        if (endpoints == null)
+        {
+            TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
+            keyToken = TokenMetadata.ringIterator(tokenMetadataClone.sortedTokens(), searchToken).next();
+            cacheKey = new EndpointCacheKey(table, keyToken);
+            endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone, table));
+            cachedEndpoints.put(cacheKey, endpoints);
+        }
+
+        return new ArrayList<InetAddress>(endpoints);
     }
 
+    public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table);
+
     public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
                                                                 Multimap<InetAddress, InetAddress> hintedEndpoints,
                                                                 ConsistencyLevel consistencyLevel,
@@ -132,12 +154,12 @@ public abstract class AbstractReplicatio
      */
     public Collection<InetAddress> getWriteEndpoints(Token token, String table, Collection<InetAddress> naturalEndpoints)
     {
-        if (tokenMetadata_.getPendingRanges(table).isEmpty())
+        if (tokenMetadata.getPendingRanges(table).isEmpty())
             return naturalEndpoints;
 
         List<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints);
 
-        for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges(table).entrySet())
+        for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata.getPendingRanges(table).entrySet())
         {
             if (entry.getKey().contains(token))
             {
@@ -161,7 +183,7 @@ public abstract class AbstractReplicatio
         for (Token token : metadata.sortedTokens())
         {
             Range range = metadata.getPrimaryRangeFor(token);
-            for (InetAddress ep : getNaturalEndpoints(token, metadata, table))
+            for (InetAddress ep : calculateNaturalEndpoints(token, metadata, table))
             {
                 map.put(ep, range);
             }
@@ -177,7 +199,7 @@ public abstract class AbstractReplicatio
         for (Token token : metadata.sortedTokens())
         {
             Range range = metadata.getPrimaryRangeFor(token);
-            for (InetAddress ep : getNaturalEndpoints(token, metadata, table))
+            for (InetAddress ep : calculateNaturalEndpoints(token, metadata, table))
             {
                 map.put(range, ep);
             }
@@ -188,7 +210,7 @@ public abstract class AbstractReplicatio
 
     public Multimap<InetAddress, Range> getAddressRanges(String table)
     {
-        return getAddressRanges(tokenMetadata_, table);
+        return getAddressRanges(tokenMetadata, table);
     }
 
     public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress, String table)
@@ -202,4 +224,25 @@ public abstract class AbstractReplicatio
     {
         return new QuorumResponseHandler(responseResolver, consistencyLevel, table);
     }
+
+    protected static class EndpointCacheKey extends Pair<String, Token>
+    {
+        public EndpointCacheKey(String table, Token keyToken) {super(table, keyToken);}
+    }
+
+    protected void clearCachedEndpoints()
+    {
+        logger.debug("clearing cached endpoints");
+        cachedEndpoints.clear();
+    }
+
+    public void invalidateCachedTokenEndpointValues()
+    {
+        clearCachedEndpoints();
+    }
+
+    public void invalidateCachedSnitchValues()
+    {
+        clearCachedEndpoints();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java Mon Jun 14 21:53:09 2010
@@ -1,5 +1,3 @@
-package org.apache.cassandra.locator;
-
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,19 +19,21 @@ package org.apache.cassandra.locator;
  * 
  */
 
+package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
 import java.util.*;
 import java.util.Map.Entry;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.utils.ResourceWatcher;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ResourceWatcher;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 /**
@@ -52,9 +52,9 @@ import org.apache.cassandra.utils.Wrappe
 public class DatacenterShardStrategy extends AbstractReplicationStrategy
 {
     private static final String DATACENTER_PROPERTY_FILENAME = "datacenters.properties";
-    private Map<String, List<Token>> dcTokens;
     private AbstractRackAwareSnitch snitch;
-    private Map<String, Map<String, Integer>> datacenters = new HashMap<String, Map<String, Integer>>();
+    private volatile Map<String, Map<String, Integer>> datacenters;
+    private static final Logger logger = LoggerFactory.getLogger(DatacenterShardStrategy.class);
 
     public DatacenterShardStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) throws ConfigurationException
     {
@@ -72,108 +72,88 @@ public class DatacenterShardStrategy ext
             }
         };
         ResourceWatcher.watch(DATACENTER_PROPERTY_FILENAME, runnable, 60 * 1000);
-
-        loadEndpoints(tokenMetadata);
     }
 
-    public void reloadConfiguration() throws ConfigurationException
+    public synchronized void reloadConfiguration() throws ConfigurationException
     {
         Properties props = PropertyFileSnitch.resourceToProperties(DATACENTER_PROPERTY_FILENAME);
-        for (Object key : props.keySet())
+        Map<String, Map<String, Integer>> newDatacenters = new HashMap<String, Map<String, Integer>>();
+        for (Entry entry : props.entrySet())
         {
-            String[] keys = ((String)key).split(":");
-            Map<String, Integer> map = datacenters.get(keys[0]);
-            if (null == map)
-            {
+            String[] keys = ((String)entry.getKey()).split(":");
+            Map<String, Integer> map = newDatacenters.get(keys[0]);
+            if (map == null)
                 map = new HashMap<String, Integer>();
-            }
-            map.put(keys[1], Integer.parseInt((String)props.get(key)));
-            datacenters.put(keys[0], map);
+            map.put(keys[1], Integer.parseInt((String) entry.getValue()));
+            newDatacenters.put(keys[0], map);
         }
-    }
-
-    private synchronized void loadEndpoints(TokenMetadata metadata) throws ConfigurationException
-    {
-        String localDC = snitch.getDatacenter(DatabaseDescriptor.getListenAddress());
-        if (localDC == null)
-            throw new ConfigurationException("Invalid datacenter configuration; couldn't find local host " + FBUtilities.getLocalAddress());
-
-        dcTokens = new HashMap<String, List<Token>>();
-        for (Token token : metadata.sortedTokens())
-        {
-            InetAddress endPoint = metadata.getEndpoint(token);
-            String dataCenter = snitch.getDatacenter(endPoint);
-            // add tokens to dcmap.
-            List<Token> lst = dcTokens.get(dataCenter);
-            if (lst == null)
+        datacenters = Collections.unmodifiableMap(newDatacenters);
+        logger.info(DATACENTER_PROPERTY_FILENAME + " changed, clearing endpoint cache");
+        clearCachedEndpoints();
+    }
+
+    public Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table)
+    {
+        int totalReplicas = getReplicationfactor(table);
+        Map<String, Integer> remainingReplicas = new HashMap<String, Integer>(datacenters.get(table));
+        Map<String, Set<String>> dcUsedRacks = new HashMap<String, Set<String>>();
+        Set<InetAddress> endpoints = new HashSet<InetAddress>(totalReplicas);
+
+        // first pass: only collect replicas on unique racks
+        for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken);
+             endpoints.size() < totalReplicas && iter.hasNext();)
+        {
+            Token token = iter.next();
+            InetAddress endpoint = tokenMetadata.getEndpoint(token);
+            String datacenter = snitch.getDatacenter(endpoint);
+            int remaining = remainingReplicas.containsKey(datacenter) ? remainingReplicas.get(datacenter) : 0;
+            if (remaining > 0)
             {
-                lst = new ArrayList<Token>();
-            }
-            lst.add(token);
-            dcTokens.put(dataCenter, lst);
-        }
-        for (Entry<String, List<Token>> entry : dcTokens.entrySet())
-        {
-            List<Token> valueList = entry.getValue();
-            Collections.sort(valueList);
-            dcTokens.put(entry.getKey(), valueList);
-        }
-
-        // TODO verify that each DC has enough endpoints for the desired RF
-    }
-
-    public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, TokenMetadata metadata, String table)
-    {
-        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
-
-        if (metadata.sortedTokens().isEmpty())
-            return endpoints;
-
-        for (String dc : dcTokens.keySet())
-        {
-            List<Token> tokens = dcTokens.get(dc);
-            Set<String> racks = new HashSet<String>();
-            // Add the node at the index by default
-            Iterator<Token> iter = TokenMetadata.ringIterator(tokens, searchToken);
-            InetAddress initialDCHost = metadata.getEndpoint(iter.next());
-            assert initialDCHost != null;
-            endpoints.add(initialDCHost);
-            racks.add(snitch.getRack(initialDCHost));
-
-            // find replicas on unique racks
-            int replicas = getReplicationFactor(dc, table);
-            int localEndpoints = 1;
-            while (localEndpoints < replicas && iter.hasNext())
-            {
-                Token t = iter.next();
-                InetAddress endpoint = metadata.getEndpoint(t);
-                if (!racks.contains(snitch.getRack(endpoint)))
+                Set<String> usedRacks = dcUsedRacks.get(datacenter);
+                if (usedRacks == null)
+                {
+                    usedRacks = new HashSet<String>();
+                    dcUsedRacks.put(datacenter, usedRacks);
+                }
+                String rack = snitch.getRack(endpoint);
+                if (!usedRacks.contains(rack))
                 {
                     endpoints.add(endpoint);
-                    localEndpoints++;
+                    usedRacks.add(rack);
+                    remainingReplicas.put(datacenter, remaining - 1);
                 }
             }
+        }
 
-            if (localEndpoints == replicas)
+        // 2nd pass: if replica count has not been achieved from unique racks, add nodes from the same racks
+        for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken);
+             endpoints.size() < totalReplicas && iter.hasNext();)
+        {
+            Token token = iter.next();
+            InetAddress endpoint = tokenMetadata.getEndpoint(token);
+            if (endpoints.contains(endpoint))
                 continue;
 
-            // if not enough unique racks were found, re-loop and add other endpoints
-            iter = TokenMetadata.ringIterator(tokens, searchToken);
-            iter.next(); // skip the first one since we already know it's used
-            while (localEndpoints < replicas && iter.hasNext())
+            String datacenter = snitch.getDatacenter(endpoint);
+            int remaining = remainingReplicas.containsKey(datacenter) ? remainingReplicas.get(datacenter) : 0;
+            if (remaining > 0)
             {
-                Token t = iter.next();
-                if (!endpoints.contains(metadata.getEndpoint(t)))
-                {
-                    localEndpoints++;
-                    endpoints.add(metadata.getEndpoint(t));
-                }
+                endpoints.add(endpoint);
+                remainingReplicas.put(datacenter, remaining - 1);
             }
         }
 
         return endpoints;
     }
 
+    public int getReplicationfactor(String table)
+    {
+        int total = 0;
+        for (int repFactor : datacenters.get(table).values())
+            total += repFactor;
+        return total;
+    }
+
     public int getReplicationFactor(String dc, String table)
     {
         return datacenters.get(table).get(dc);

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Mon Jun 14 21:53:09 2010
@@ -18,13 +18,9 @@
 
 package org.apache.cassandra.locator;
 
-import java.net.UnknownHostException;
-
 import java.net.InetAddress;
-import java.util.Set;
-import java.util.List;
 import java.util.Collection;
-
+import java.util.List;
 
 /**
  * This interface helps determine location of node in the data center relative to another node.
@@ -43,5 +39,10 @@ public interface IEndpointSnitch
      * This method will sort the <tt>List</tt> by proximity to the given address.
      */
     public List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
-}
 
+    /**
+     * register to receive notification when the endpoint snitch has changed the answers it was providing.
+     * @param subscriber the subscriber to notify
+     */
+    public void register(AbstractReplicationStrategy subscriber);
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Mon Jun 14 21:53:09 2010
@@ -24,23 +24,25 @@ import java.net.InetAddress;
 import java.util.Properties;
 import java.util.StringTokenizer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ResourceWatcher;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Used to determine if two IP's are in the same datacenter or on the same rack.
  * <p/>
  * Based on a properties file configuration.
  */
-public class PropertyFileSnitch extends AbstractRackAwareSnitch {
+public class PropertyFileSnitch extends AbstractRackAwareSnitch
+{
     /**
      * A list of properties with keys being host:port and values being datacenter:rack
      */
-    private volatile Properties hostProperties = new Properties();
+    private volatile Properties hostProperties;
 
     /**
      * The default rack property file to be read.
@@ -112,6 +114,7 @@ public class PropertyFileSnitch extends 
     public void reloadConfiguration() throws ConfigurationException
     {
         hostProperties = resourceToProperties(RACK_PROPERTY_FILENAME);
+        invalidateCachedSnitchValues();
     }
 
     public static Properties resourceToProperties(String filename) throws ConfigurationException

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Mon Jun 14 21:53:09 2010
@@ -16,16 +16,14 @@
 * specific language governing permissions and limitations
 * under the License.
 */
+
 package org.apache.cassandra.locator;
 
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.net.InetAddress;
+import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
-import java.net.InetAddress;
 
 /*
  * This Replication Strategy returns the nodes responsible for a given
@@ -43,10 +41,10 @@ public class RackAwareStrategy extends A
             throw new IllegalArgumentException(("RackAwareStrategy requires AbstractRackAwareSnitch."));
     }
 
-    public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+    public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
         int replicas = DatabaseDescriptor.getReplicationFactor(table);
-        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+        Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas);
         List<Token> tokens = metadata.sortedTokens();
 
         if (tokens.isEmpty())
@@ -60,7 +58,7 @@ public class RackAwareStrategy extends A
         boolean bOtherRack = false;
         while (endpoints.size() < replicas && iter.hasNext())
         {
-            AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch)snitch_;
+            AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch) this.snitch;
 
             // First try to find one in a different data center
             Token t = iter.next();

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Jun 14 21:53:09 2010
@@ -16,15 +16,14 @@
 * specific language governing permissions and limitations
 * under the License.
 */
+
 package org.apache.cassandra.locator;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.net.InetAddress;
+import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
-import java.net.InetAddress;
 
 /**
  * This class returns the nodes responsible for a given
@@ -39,11 +38,11 @@ public class RackUnawareStrategy extends
         super(tokenMetadata, snitch);
     }
 
-    public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+    public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
         int replicas = DatabaseDescriptor.getReplicationFactor(table);
         List<Token> tokens = metadata.sortedTokens();
-        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+        Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas);
 
         if (tokens.isEmpty())
             return endpoints;

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java Mon Jun 14 21:53:09 2010
@@ -19,18 +19,19 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 /**
  * A simple endpoint snitch implementation does not sort addresses by
  * proximity.
  */
-public class SimpleSnitch implements IEndpointSnitch
+public class SimpleSnitch extends AbstractEndpointSnitch
 {
     public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
     {
-        List<InetAddress> list = new ArrayList<InetAddress>(addresses);
-        return list;
+        return new ArrayList<InetAddress>(addresses);
     }
 
     public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Mon Jun 14 21:53:09 2010
@@ -18,23 +18,23 @@
 
 package org.apache.cassandra.locator;
 
+import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.common.collect.*;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.Range;
-
-import java.net.InetAddress;
-
 import org.apache.commons.lang.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.*;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
 public class TokenMetadata
 {
     private static Logger logger = LoggerFactory.getLogger(TokenMetadata.class);
@@ -64,6 +64,9 @@ public class TokenMetadata
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
     private List<Token> sortedTokens;
 
+    /* list of subscribers that are notified when the tokenToEndpointMap changed */
+    private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers;
+
     public TokenMetadata()
     {
         this(null);
@@ -78,6 +81,7 @@ public class TokenMetadata
         leavingEndpoints = new HashSet<InetAddress>();
         pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
         sortedTokens = sortTokens();
+        subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
     }
 
     private List<Token> sortTokens()
@@ -116,6 +120,7 @@ public class TokenMetadata
                 sortedTokens = sortTokens();
             }
             leavingEndpoints.remove(endpoint);
+            fireTokenToEndpointMapChanged();
         }
         finally
         {
@@ -205,6 +210,7 @@ public class TokenMetadata
             tokenToEndpointMap.inverse().remove(endpoint);
             leavingEndpoints.remove(endpoint);
             sortedTokens = sortTokens();
+            fireTokenToEndpointMapChanged();
         }
         finally
         {
@@ -454,6 +460,7 @@ public class TokenMetadata
         tokenToEndpointMap.clear();
         leavingEndpoints.clear();
         pendingRanges.clear();
+        fireTokenToEndpointMapChanged();
     }
 
     public String toString()
@@ -529,4 +536,17 @@ public class TokenMetadata
 
         return sb.toString();
     }
+
+    protected void fireTokenToEndpointMapChanged()
+    {
+        for (AbstractReplicationStrategy subscriber : subscribers)
+        {
+            subscriber.invalidateCachedTokenEndpointValues();
+        }
+    }
+
+    public void register(AbstractReplicationStrategy subscriber)
+    {
+        subscribers.add(subscriber);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jun 14 21:53:09 2010
@@ -22,20 +22,24 @@ import java.io.IOError;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.net.InetAddress;
-import javax.management.*;
+import java.util.concurrent.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
-import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
@@ -43,27 +47,25 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.migration.AddKeyspace;
 import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.DeletionService;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.io.util.FileUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.log4j.Level;
-import org.apache.commons.lang.StringUtils;
-
-import com.google.common.collect.Multimap;
-import com.google.common.collect.HashMultimap;
 
 /*
  * This abstraction contains the token/identifier of this node
@@ -764,8 +766,8 @@ public class StorageService implements I
         // all leaving nodes are gone.
         for (Range range : affectedRanges)
         {
-            List<InetAddress> currentEndpoints = strategy.getNaturalEndpoints(range.right, tm, table);
-            List<InetAddress> newEndpoints = strategy.getNaturalEndpoints(range.right, allLeftMetadata, table);
+            Set<InetAddress> currentEndpoints = strategy.calculateNaturalEndpoints(range.right, tm, table);
+            Set<InetAddress> newEndpoints = strategy.calculateNaturalEndpoints(range.right, allLeftMetadata, table);
             newEndpoints.removeAll(currentEndpoints);
             pendingRanges.putAll(range, newEndpoints);
         }
@@ -870,11 +872,11 @@ public class StorageService implements I
         if (logger_.isDebugEnabled())
             logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, ", ") + "]");
 
-        Map<Range, ArrayList<InetAddress>> currentReplicaEndpoints = new HashMap<Range, ArrayList<InetAddress>>();
+        Map<Range, Set<InetAddress>> currentReplicaEndpoints = new HashMap<Range, Set<InetAddress>>();
 
         // Find (for each range) all nodes that store replicas for these ranges as well
         for (Range range : ranges)
-            currentReplicaEndpoints.put(range, getReplicationStrategy(table).getNaturalEndpoints(range.right, tokenMetadata_, table));
+            currentReplicaEndpoints.put(range, getReplicationStrategy(table).calculateNaturalEndpoints(range.right, tokenMetadata_, table));
 
         TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
 
@@ -892,7 +894,7 @@ public class StorageService implements I
         // range.
         for (Range range : ranges)
         {
-            ArrayList<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).getNaturalEndpoints(range.right, temp, table);
+            Set<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).calculateNaturalEndpoints(range.right, temp, table);
             newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
             if (logger_.isDebugEnabled())
                 if (newReplicaEndpoints.isEmpty())

Modified: cassandra/trunk/test/conf/cassandra-rack.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra-rack.properties?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra-rack.properties (original)
+++ cassandra/trunk/test/conf/cassandra-rack.properties Mon Jun 14 21:53:09 2010
@@ -32,5 +32,11 @@
 10.21.119.14=DC3:RAC2
 10.20.114.15=DC2:RAC2
 
+127.0.0.1=DC1:RAC1
+127.0.0.2=DC2:RAC2
+127.0.0.3=DC3:RAC3
+127.0.0.4=DC4:RAC4
+127.0.0.5=DC3:RAC3
+
 # default for unknown nodes
-default=DC1:r1
\ No newline at end of file
+default=DC1:r1

Modified: cassandra/trunk/test/conf/datacenters.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/datacenters.properties?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/conf/datacenters.properties (original)
+++ cassandra/trunk/test/conf/datacenters.properties Mon Jun 14 21:53:09 2010
@@ -20,4 +20,7 @@
 # keyspace\:datacenter=replication factor
 Keyspace1\:DC1=3
 Keyspace1\:DC2=2
-Keyspace1\:DC3=1
\ No newline at end of file
+Keyspace1\:DC3=1
+Keyspace3\:DC1=3
+Keyspace3\:DC2=2
+Keyspace3\:DC3=1
\ No newline at end of file

Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java Mon Jun 14 21:53:09 2010
@@ -1,3 +1,22 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
 package org.apache.cassandra.locator;
 
 import java.io.IOException;
@@ -5,15 +24,13 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashSet;
-
 import javax.xml.parsers.ParserConfigurationException;
 
+import org.junit.Test;
+
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.StringToken;
 import org.apache.cassandra.dht.Token;
-
-import org.junit.Test;
-
 import org.xml.sax.SAXException;
 
 public class DatacenterShardStrategyTest

Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java Mon Jun 14 21:53:09 2010
@@ -16,9 +16,8 @@
 * specific language governing permissions and limitations
 * under the License.
 */
-package org.apache.cassandra.locator;
 
-import static org.junit.Assert.assertEquals;
+package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -27,13 +26,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.BigIntegerToken;
 import org.apache.cassandra.dht.Token;
 
-import org.junit.Before;
-import org.junit.Test;
-
 public class RackAwareStrategyTest
 {
     private List<Token> endpointTokens;
@@ -160,7 +160,7 @@ public class RackAwareStrategyTest
     {
         for (Token keyToken : keyTokens)
         {
-            List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyToken, tmd, table);
+            List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyToken, table);
             for (int j = 0; j < endpoints.size(); j++)
             {
                 ArrayList<InetAddress> hostsExpected = expectedResults.get(keyToken.toString());

Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Mon Jun 14 21:53:09 2010
@@ -18,28 +18,21 @@
 */
 package org.apache.cassandra.locator;
 
-import static org.junit.Assert.*;
-
-import java.util.Arrays;
-import java.util.List;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.junit.Test;
 
+import static org.junit.Assert.*;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.service.StorageServiceAccessor;
-import org.junit.Test;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-import org.apache.cassandra.dht.StringToken;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.service.StorageService;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import org.apache.cassandra.service.StorageServiceAccessor;
 
 public class RackUnawareStrategyTest extends SchemaLoader
 {
@@ -71,8 +64,7 @@ public class RackUnawareStrategyTest ext
             endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
             keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
         }
-        for (String table : DatabaseDescriptor.getNonSystemTables())
-            testGetEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table);
+        verifyGetNaturalEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]));
     }
 
     @Test
@@ -88,29 +80,31 @@ public class RackUnawareStrategyTest ext
             endpointTokens.add(new StringToken(String.valueOf((char)('a' + i * 2))));
             keyTokens.add(partitioner.getToken(String.valueOf((char)('a' + i * 2 + 1)).getBytes()));
         }
-        for (String table : DatabaseDescriptor.getNonSystemTables())
-            testGetEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table);
+        verifyGetNaturalEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]));
     }
 
     // given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens,
     // make sure that the Strategy picks the right endpoints for the keys.
-    private void testGetEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endpointTokens, Token[] keyTokens, String table) throws UnknownHostException
+    private void verifyGetNaturalEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endpointTokens, Token[] keyTokens) throws UnknownHostException
     {
-        List<InetAddress> hosts = new ArrayList<InetAddress>();
-        for (int i = 0; i < endpointTokens.length; i++)
+        for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
-            tmd.updateNormalToken(endpointTokens[i], ep);
-            hosts.add(ep);
-        }
+            List<InetAddress> hosts = new ArrayList<InetAddress>();
+            for (int i = 0; i < endpointTokens.length; i++)
+            {
+                InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
+                tmd.updateNormalToken(endpointTokens[i], ep);
+                hosts.add(ep);
+            }
 
-        for (int i = 0; i < keyTokens.length; i++)
-        {
-            List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i], table);
-            assertEquals(DatabaseDescriptor.getReplicationFactor(table), endpoints.size());
-            for (int j = 0; j < endpoints.size(); j++)
+            for (int i = 0; i < keyTokens.length; i++)
             {
-                assertEquals(endpoints.get(j), hosts.get((i + j + 1) % hosts.size()));
+                List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i], table);
+                assertEquals(DatabaseDescriptor.getReplicationFactor(table), endpoints.size());
+                List<InetAddress> correctEndpoints = new ArrayList<InetAddress>();
+                for (int j = 0; j < endpoints.size(); j++)
+                    correctEndpoints.add(hosts.get((i + j + 1) % hosts.size()));
+                assertEquals(new HashSet<InetAddress>(correctEndpoints), new HashSet<InetAddress>(endpoints));
             }
         }
     }

Added: cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=954657&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Mon Jun 14 21:53:09 2010
@@ -0,0 +1,156 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.lang.reflect.Constructor;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.dht.BigIntegerToken;
+import org.apache.cassandra.dht.Token;
+
+public class ReplicationStrategyEndpointCacheTest extends SchemaLoader
+{
+    private TokenMetadata tmd;
+    private Token searchToken;
+    private AbstractReplicationStrategy strategy;
+
+    public void setup(Class stratClass) throws Exception
+    {
+        tmd = new TokenMetadata();
+        searchToken = new BigIntegerToken(String.valueOf(15));
+        Constructor constructor = stratClass.getConstructor(TokenMetadata.class, IEndpointSnitch.class);
+        strategy = (AbstractReplicationStrategy) constructor.newInstance(tmd, new PropertyFileSnitch());
+
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2"));
+    }
+
+    @Test
+    public void testEndpointsWereCached() throws Exception
+    {
+        runEndpointsWereCachedTest(FakeRackUnawareStrategy.class);
+        runEndpointsWereCachedTest(FakeRackAwareStrategy.class);
+        runEndpointsWereCachedTest(FakeDatacenterShardStrategy.class);
+    }
+
+    public void runEndpointsWereCachedTest(Class stratClass) throws Exception
+    {
+        setup(stratClass);
+        assert strategy.getNaturalEndpoints(searchToken, "Keyspace3").equals(strategy.getNaturalEndpoints(searchToken, "Keyspace3"));
+    }
+
+    @Test
+    public void testCacheRespectsTokenChanges() throws Exception
+    {
+        runCacheRespectsTokenChangesTest(RackUnawareStrategy.class);
+        runCacheRespectsTokenChangesTest(RackAwareStrategy.class);
+        runCacheRespectsTokenChangesTest(DatacenterShardStrategy.class);
+    }
+
+    public void runCacheRespectsTokenChangesTest(Class stratClass) throws Exception
+    {
+        // TODO DSS is asked to provide a total of 6 replicas, but we never give it 6 endpoints.
+        // thus we are testing undefined behavior, at best.
+        setup(stratClass);
+        ArrayList<InetAddress> endpoints;
+
+        endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+        assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+
+        // test token addition
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3"));
+        endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+        assert endpoints.size() == 3 : StringUtils.join(endpoints, ",");
+
+        // test token removal
+        tmd.removeEndpoint(InetAddress.getByName("127.0.0.2"));
+        endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+        assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+
+        // test token change
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.5"));
+        endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+        assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+        assert endpoints.contains(InetAddress.getByName("127.0.0.5"));
+        assert !endpoints.contains(InetAddress.getByName("127.0.0.3"));
+    }
+
+    protected static class FakeRackUnawareStrategy extends RackUnawareStrategy
+    {
+        private boolean called = false;
+
+        public FakeRackUnawareStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch)
+        {
+            super(tokenMetadata, snitch);
+        }
+
+        @Override
+        public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+        {
+            assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+            called = true;
+            return super.calculateNaturalEndpoints(token, metadata, table);
+        }
+    }
+
+    protected static class FakeRackAwareStrategy extends RackAwareStrategy
+    {
+        private boolean called = false;
+
+        public FakeRackAwareStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch)
+        {
+            super(tokenMetadata, snitch);
+        }
+
+        @Override
+        public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+        {
+            assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+            called = true;
+            return super.calculateNaturalEndpoints(token, metadata, table);
+        }
+    }
+
+    protected static class FakeDatacenterShardStrategy extends DatacenterShardStrategy
+    {
+        private boolean called = false;
+
+        public FakeDatacenterShardStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) throws ConfigurationException
+        {
+            super(tokenMetadata, snitch);
+        }
+
+        @Override
+        public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+        {
+            assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+            called = true;
+            return super.calculateNaturalEndpoints(token, metadata, table);
+        }
+    }
+}