You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/21 16:18:19 UTC

svn commit: r828029 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/locator/

Author: jbellis
Date: Wed Oct 21 14:18:18 2009
New Revision: 828029

URL: http://svn.apache.org/viewvc?rev=828029&view=rev
Log:
make write targets computable independent of replication strategy (i.e., make getReadStorageEndPoints the only method a Strategy needs to implement).  we do this by computing the token/endpoint -> Range[] map from the read endpoints, then using that to determine if a bootstrapping node needs to receive a write (if the token being written falls in any of its ranges).  Also, make the StorageProxy insert methods aware that bootstrap can entail having extra write targets temporarily, and include those in its consistencylevel calculations.
patch by jbellis; reviewed by Sandeep Tata for CASSANDRA-497

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Oct 21 14:18:18 2009
@@ -757,7 +757,7 @@
      * @return
      * @throws IOException
      */
-    List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader> sstables, List<Range> ranges, EndPoint target) throws IOException
+    List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader> sstables, Collection<Range> ranges, EndPoint target) throws IOException
     {
         logger_.info("AntiCompacting [" + StringUtils.join(sstables, ",") + "]");
         // Calculate the expected compacted filesize

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java Wed Oct 21 14:18:18 2009
@@ -129,7 +129,7 @@
     }
     
 
-    public static boolean isTokenInRanges(Token token, List<Range> ranges)
+    public static boolean isTokenInRanges(Token token, Iterable<Range> ranges)
     {
         assert ranges != null;
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Oct 21 14:18:18 2009
@@ -24,6 +24,7 @@
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.StorageService;
@@ -50,19 +51,58 @@
         storagePort_ = storagePort;
     }
 
-    public abstract EndPoint[] getWriteStorageEndPoints(Token token);
     public abstract EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
-    public abstract EndPoint[] getReadStorageEndPoints(Token token);
 
+    public EndPoint[] getReadStorageEndPoints(Token token)
+    {
+        return getReadStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
+    }
+    
     /*
      * This method returns the hint map. The key is the endpoint
      * on which the data is being placed and the value is the
-     * endpoint which is in the top N.
-     * Get the map of top N to the live nodes currently.
+     * endpoint to which it should be forwarded.
      */
-    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
+    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token, EndPoint[] naturalEndpoints)
     {
-        return getHintedMapForEndpoints(getWriteStorageEndPoints(token));
+        return getHintedMapForEndpoints(getWriteStorageEndPoints(token, naturalEndpoints));
+    }
+
+    /**
+     * write endpoints may be different from read endpoints, because read endpoints only need care about the
+     * "natural" nodes for a token, but write endpoints also need to account for nodes that are bootstrapping
+     * into the ring, and write data there too so that they stay up to date during the bootstrap process.
+     * Thus, this method may return more nodes than the Replication Factor.
+     *
+     * Only ReplicationStrategy should care about this method (higher level users should only ask for Hinted).
+     */
+    protected EndPoint[] getWriteStorageEndPoints(Token token, EndPoint[] naturalEndpoints)
+    {
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Map<Token, EndPoint> bootstrapTokensToEndpointMap = tokenMetadata_.cloneBootstrapNodes();
+        ArrayList<EndPoint> list = new ArrayList<EndPoint>(Arrays.asList(naturalEndpoints));
+        for (Token t : bootstrapTokensToEndpointMap.keySet())
+        {
+            EndPoint ep = bootstrapTokensToEndpointMap.get(t);
+            tokenToEndPointMap.put(t, ep);
+            try
+            {
+                for (Range r : getRangeMap(tokenToEndPointMap).get(ep))
+                {
+                    if (r.contains(token))
+                    {
+                        list.add(ep);
+                        break;
+                    }
+                }
+            }
+            finally
+            {
+                tokenToEndPointMap.remove(t);
+            }
+        }
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[list.size()]);
     }
 
     /*
@@ -125,4 +165,53 @@
         }
         return map;
     }
+
+    // TODO this is pretty inefficient.
+    // fixing this probably requires merging tokenmetadata into replicationstrategy, so we can cache/invalidate cleanly
+    protected Map<EndPoint, Set<Range>> getRangeMap(Map<Token, EndPoint> tokenMap)
+    {
+        Map<EndPoint, Set<Range>> map = new HashMap<EndPoint, Set<Range>>();
+
+        for (EndPoint ep : tokenMap.values())
+        {
+            map.put(ep, new HashSet<Range>());
+        }
+
+        for (Token token : tokenMap.keySet())
+        {
+            Range range = getPrimaryRangeFor(token, tokenMap);
+            for (EndPoint ep : getReadStorageEndPoints(token, tokenMap))
+            {
+                map.get(ep).add(range);
+            }
+        }
+
+        return map;
+    }
+
+    public Map<EndPoint, Set<Range>> getRangeMap()
+    {
+        return getRangeMap(tokenMetadata_.cloneTokenEndPointMap());
+    }
+
+    public Range getPrimaryRangeFor(Token right, Map<Token, EndPoint> tokenToEndPointMap)
+    {
+        return new Range(getPredecessor(right, tokenToEndPointMap), right);
+    }
+
+    public Token getPredecessor(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+    {
+        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        int index = Collections.binarySearch(tokens, token);
+        return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(--index));
+    }
+
+    public Token getSuccessor(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+    {
+        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        int index = Collections.binarySearch(tokens, token);
+        return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(++index));
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Wed Oct 21 14:18:18 2009
@@ -44,14 +44,13 @@
         super(tokenMetadata, partitioner, replicas, storagePort);
     }
 
-    public EndPoint[] getReadStorageEndPoints(Token token)
+    public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
     {
         int startIndex;
         List<EndPoint> list = new ArrayList<EndPoint>();
         boolean bDataCenter = false;
         boolean bOtherRack = false;
         int foundCount = 0;
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
         List tokens = new ArrayList(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
@@ -121,20 +120,4 @@
         retrofitPorts(list);
         return list.toArray(new EndPoint[list.size()]);
     }
-    
-    public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
-    {
-        throw new UnsupportedOperationException("This operation is not currently supported");
-    }
-
-    public EndPoint[] getWriteStorageEndPoints(Token token)
-    {
-        throw new UnsupportedOperationException("Rack-aware bootstrapping not supported");
-    }
-
-    
-    public Map<EndPoint, EndPoint> getHintedStorageEndPointsForWrite(Token token)
-    {
-        throw new UnsupportedOperationException("Rack-aware bootstrapping not supported");
-    }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Wed Oct 21 14:18:18 2009
@@ -40,52 +40,14 @@
         super(tokenMetadata, partitioner, replicas, storagePort);
     }
 
-    public EndPoint[] getReadStorageEndPoints(Token token)
-    {
-        return getReadStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
-    }
-    
-    public EndPoint[] getWriteStorageEndPoints(Token token)
-    {
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        Map<Token, EndPoint> bootstrapTokensToEndpointMap = tokenMetadata_.cloneBootstrapNodes();
-        List<Token> tokenList = getStorageTokens(token, tokenToEndPointMap, bootstrapTokensToEndpointMap);
-        List<EndPoint> list = new ArrayList<EndPoint>();
-        for (Token t: tokenList)
-        {
-            EndPoint e = tokenToEndPointMap.get(t);
-            if (e == null) 
-                e = bootstrapTokensToEndpointMap.get(t); 
-            assert e != null;
-            list.add(e);
-        }
-        retrofitPorts(list);
-        return list.toArray(new EndPoint[list.size()]);            
-    }
-    
     public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
     {
-        List<Token> tokenList = getStorageTokens(token, tokenToEndPointMap, null);
-        List<EndPoint> list = new ArrayList<EndPoint>();
-        for (Token t: tokenList)
-            list.add(tokenToEndPointMap.get(t));
-        retrofitPorts(list);
-        return list.toArray(new EndPoint[list.size()]);
-    }
-
-    private List<Token> getStorageTokens(Token token, Map<Token, EndPoint> tokenToEndPointMap, Map<Token, EndPoint> bootStrapTokenToEndPointMap)
-    {
         int startIndex;
         List<Token> tokenList = new ArrayList<Token>();
         int foundCount = 0;
         List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
         List<Token> bsTokens = null;
-        
-        if (bootStrapTokenToEndPointMap != null)
-        {
-            bsTokens = new ArrayList<Token>(bootStrapTokenToEndPointMap.keySet());
-            tokens.addAll(bsTokens);
-        }
+
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
@@ -112,6 +74,10 @@
                     foundCount++;
             }
         }
-        return tokenList;
+        List<EndPoint> list = new ArrayList<EndPoint>();
+        for (Token t: tokenList)
+            list.add(tokenToEndPointMap.get(t));
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[list.size()]);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed Oct 21 14:18:18 2009
@@ -244,4 +244,17 @@
         
         return sb.toString();
     }
+
+    public EndPoint getEndPoint(Token token)
+    {
+        lock_.readLock().lock();
+        try
+        {
+            return tokenToEndPointMap_.get(token);
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Oct 21 14:18:18 2009
@@ -113,8 +113,8 @@
         long startTime = System.currentTimeMillis();
 		try
 		{
-            // (This is the ZERO consistency level, so user doesn't care if we don't really have N destinations available.)
-			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key());
+            EndPoint[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
+			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
 			Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
 			for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
 			{
@@ -149,8 +149,9 @@
         }
         try
         {
-            Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key());
-            int blockFor = determineBlockFor(consistency_level);
+            EndPoint[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
+			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
+            int blockFor = determineBlockFor(naturalEndpoints.length, endpointMap.size(), consistency_level);
             List<EndPoint> primaryNodes = getUnhintedNodes(endpointMap);
             if (primaryNodes.size() < blockFor) // guarantee blockFor = W live nodes.
             {
@@ -199,20 +200,21 @@
         return liveEndPoints;
     }
 
-    private static int determineBlockFor(int consistency_level)
+    private static int determineBlockFor(int naturalTargets, int hintedTargets, int consistency_level)
     {
+        int bootstrapTargets = hintedTargets - naturalTargets;
         int blockFor;
         if (consistency_level == ConsistencyLevel.ONE)
         {
-            blockFor = 1;
+            blockFor = 1 + bootstrapTargets;
         }
         else if (consistency_level == ConsistencyLevel.QUORUM)
         {
-            blockFor = (DatabaseDescriptor.getReplicationFactor() / 2) + 1;
+            blockFor = (naturalTargets / 2) + 1 + bootstrapTargets;
         }
         else if (consistency_level == ConsistencyLevel.ALL)
         {
-            blockFor = DatabaseDescriptor.getReplicationFactor();
+            blockFor = naturalTargets + bootstrapTargets;
         }
         else
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Oct 21 14:18:18 2009
@@ -95,7 +95,7 @@
         return partitioner_;
     }
 
-    public List<Range> getLocalRanges()
+    public Set<Range> getLocalRanges()
     {
         return getRangesForEndPoint(getLocalStorageEndPoint());
     }
@@ -384,23 +384,6 @@
           logger_.debug("Done constructing range to endpoint map ...");
         return rangeToEndPointMap;
     }
-    
-    /**
-     * Construct a mapping from endpoint to ranges that endpoint is
-     * responsible for.
-     * @return the mapping from endpoint to the ranges it is responsible
-     * for.
-     */
-    public Map<EndPoint, List<Range>> constructEndPointToRangesMap()
-    {
-        Map<EndPoint, List<Range>> endPointToRangesMap = new HashMap<EndPoint, List<Range>>();
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        for (EndPoint mbr : tokenToEndPointMap.values())
-        {
-            endPointToRangesMap.put(mbr, getRangesForEndPoint(mbr));
-        }
-        return endPointToRangesMap;
-    }
 
     /**
      *  Called when there is a change in application state. In particular
@@ -531,31 +514,6 @@
         /* Remove the state from the Gossiper */
         Gossiper.instance().removeFromMembership(endpoint);
     }
-    
-    /*
-     * This method is invoked by the Loader process to force the
-     * node to move from its current position on the token ring, to
-     * a position to be determined based on the keys. This will help
-     * all nodes to start off perfectly load balanced. The array passed
-     * in is evaluated as follows by the loader process:
-     * If there are 10 keys in the system and a totality of 5 nodes
-     * then each node needs to have 2 keys i.e the array is made up
-     * of every 2nd key in the total list of keys.
-    */
-    public void relocate(String[] keys) throws IOException
-    {
-    	if ( keys.length > 0 )
-    	{
-            Token token = tokenMetadata_.getToken(StorageService.tcpAddr_);
-	        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-	        Token[] tokens = tokenToEndPointMap.keySet().toArray(new Token[tokenToEndPointMap.keySet().size()]);
-	        Arrays.sort(tokens);
-	        int index = Arrays.binarySearch(tokens, token) * (keys.length/tokens.length);
-            Token newToken = partitioner_.getToken(keys[index]);
-	        /* update the token */
-	        updateToken(newToken);
-    	}
-    }
 
     /**
      * Deliver hints to the specified node when it has crashed
@@ -572,17 +530,6 @@
     }
 
     /* This methods belong to the MBean interface */
-    
-    public String getToken(EndPoint ep)
-    {
-        // render a String representation of the Token corresponding to this endpoint
-        // for a human-facing UI.  If there is no such Token then we use "" since
-        // it is not a valid value either for BigIntegerToken or StringToken.
-        EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getStoragePort());
-        Token token = tokenMetadata_.getToken(ep2);
-        // if there is no token for an endpoint, return an empty string to denote that
-        return ( token == null ) ? "" : token.toString();
-    }
 
     public String getToken()
     {
@@ -760,13 +707,7 @@
     EndPoint getPredecessor(EndPoint ep)
     {
         Token token = tokenMetadata_.getToken(ep);
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
-        Collections.sort(tokens);
-        int index = Collections.binarySearch(tokens, token);
-        return (index == 0) ? tokenToEndPointMap.get(tokens
-                .get(tokens.size() - 1)) : tokenToEndPointMap.get(tokens
-                .get(--index));
+        return tokenMetadata_.getEndPoint(replicationStrategy_.getPredecessor(token, tokenMetadata_.cloneTokenEndPointMap()));
     }
 
     /*
@@ -776,13 +717,7 @@
     public EndPoint getSuccessor(EndPoint ep)
     {
         Token token = tokenMetadata_.getToken(ep);
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
-        Collections.sort(tokens);
-        int index = Collections.binarySearch(tokens, token);
-        return (index == (tokens.size() - 1)) ? tokenToEndPointMap
-                .get(tokens.get(0))
-                : tokenToEndPointMap.get(tokens.get(++index));
+        return tokenMetadata_.getEndPoint(replicationStrategy_.getSuccessor(token, tokenMetadata_.cloneTokenEndPointMap()));
     }
 
     /**
@@ -793,9 +728,7 @@
     public Range getPrimaryRangeForEndPoint(EndPoint ep)
     {
         Token right = tokenMetadata_.getToken(ep);
-        EndPoint predecessor = getPredecessor(ep);
-        Token left = tokenMetadata_.getToken(predecessor);
-        return new Range(left, right);
+        return replicationStrategy_.getPrimaryRangeFor(right, tokenMetadata_.cloneTokenEndPointMap());
     }
     
     /**
@@ -803,32 +736,11 @@
      * @param ep endpoint we are interested in.
      * @return ranges for the specified endpoint.
      */
-    List<Range> getRangesForEndPoint(EndPoint ep)
+    Set<Range> getRangesForEndPoint(EndPoint ep)
     {
-        List<Range> ranges = new ArrayList<Range>();
-        ranges.add( getPrimaryRangeForEndPoint(ep) );
-        
-        EndPoint predecessor = ep;
-        int count = DatabaseDescriptor.getReplicationFactor() - 1;
-        for ( int i = 0; i < count; ++i )
-        {
-            predecessor = getPredecessor(predecessor);
-            ranges.add( getPrimaryRangeForEndPoint(predecessor) );
-        }
-        
-        return ranges;
-    }
-    
-    /**
-     * Get all ranges that span the ring as per
-     * current snapshot of the token distribution.
-     * @return all ranges in sorted order.
-     */
-    public Range[] getAllRanges()
-    {
-        return getAllRanges(tokenMetadata_.cloneTokenEndPointMap().keySet());
+        return replicationStrategy_.getRangeMap().get(ep);
     }
-    
+        
     /**
      * Get all ranges that span the ring given a set
      * of tokens. All ranges are in sorted order of 
@@ -940,9 +852,9 @@
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public Map<EndPoint, EndPoint> getHintedStorageEndpointMap(String key)
+    public Map<EndPoint, EndPoint> getHintedStorageEndpointMap(String key, EndPoint[] naturalEndpoints)
     {
-        return replicationStrategy_.getHintedStorageEndPoints(partitioner_.getToken(key));
+        return replicationStrategy_.getHintedStorageEndPoints(partitioner_.getToken(key), naturalEndpoints);
     }
 
     public void retrofitPorts(List<EndPoint> eps)

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=828029&r1=828028&r2=828029&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Wed Oct 21 14:18:18 2009
@@ -121,7 +121,7 @@
         
         for (int i = 0; i < keyTokens.length; i++)
         {
-            EndPoint[] endPoints = strategy.getWriteStorageEndPoints(keyTokens[i]);
+            EndPoint[] endPoints = strategy.getWriteStorageEndPoints(keyTokens[i], strategy.getReadStorageEndPoints(keyTokens[i]));
             assertTrue(endPoints.length >=3);
             List<EndPoint> endPointsList = Arrays.asList(endPoints);