You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/07 03:21:07 UTC

svn commit: r907368 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: locator/DatacenterShardStategy.java locator/RackAwareStrategy.java locator/RackUnawareStrategy.java locator/TokenMetadata.java service/StorageService.java

Author: jbellis
Date: Sun Feb  7 02:21:06 2010
New Revision: 907368

URL: http://svn.apache.org/viewvc?rev=907368&view=rev
Log:
add TokenMetadata.ringIterator to replace one-offs of the same functionality
patch by jbellis; reviewed by Stu Hood for CASSANDRA-771

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java Sun Feb  7 02:21:06 2010
@@ -142,54 +142,37 @@
 
         for (String dc : dcMap.keySet())
         {
-            int foundCount = 0;
-            ArrayList<InetAddress> forloopReturn = new ArrayList<InetAddress>();
             int replicas_ = dcReplicationFactor.get(dc);
-            List tokens = dcMap.get(dc);
+            ArrayList<InetAddress> forloopReturn = new ArrayList<InetAddress>(replicas_);
+            List<Token> tokens = dcMap.get(dc);
             boolean bOtherRack = false;
             boolean doneDataCenterItr;
-            int index = Collections.binarySearch(tokens, searchToken);
-            if (index < 0)
-            {
-                index = (index + 1) * (-1);
-                if (index >= tokens.size())
-                {
-                    index = 0;
-                }
-            }
-            int totalNodes = tokens.size();
             // Add the node at the index by default
-            InetAddress primaryHost = metadata.getEndPoint((Token) tokens.get(index));
+            Iterator<Token> iter = TokenMetadata.ringIterator(tokens, searchToken);
+            InetAddress primaryHost = metadata.getEndPoint(iter.next());
             forloopReturn.add(primaryHost);
-            foundCount++;
-            if (replicas_ == 1)
-            {
-                continue;
-            }
 
-            int startIndex = (index + 1) % totalNodes;
-            for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+            while (forloopReturn.size() < replicas_ && iter.hasNext())
             {
-                InetAddress endPointOfIntrest = metadata.getEndPoint((Token) tokens.get(i));
-                if ((replicas_ - 1) > foundCount)
+                Token t = iter.next();
+                InetAddress endPointOfIntrest = metadata.getEndPoint(t);
+                if (forloopReturn.size() < replicas_ - 1)
                 {
                     forloopReturn.add(endPointOfIntrest);
-                    foundCount++;
                     continue;
                 }
                 else
                 {
                     doneDataCenterItr = true;
                 }
-                
+
                 // Now try to find one on a different rack
                 if (!bOtherRack)
                 {
                     if (!((DatacenterEndPointSnitch)snitch_).isOnSameRack(primaryHost, endPointOfIntrest))
                     {
-                        forloopReturn.add(metadata.getEndPoint((Token) tokens.get(i)));
+                        forloopReturn.add(metadata.getEndPoint(t));
                         bOtherRack = true;
-                        foundCount++;
                     }
                 }
                 // If both already found exit loop.
@@ -204,13 +187,16 @@
             * exit. Otherwise just loop through the list and add until we
             * have N nodes.
             */
-            for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+            if (forloopReturn.size() < replicas_)
             {
-                Token t = (Token) tokens.get(i);
-                if (!forloopReturn.contains(metadata.getEndPoint(t)))
+                iter = TokenMetadata.ringIterator(tokens, searchToken);
+                while (forloopReturn.size() < replicas_ && iter.hasNext())
                 {
-                    forloopReturn.add(metadata.getEndPoint(t));
-                    foundCount++;
+                    Token t = iter.next();
+                    if (!forloopReturn.contains(metadata.getEndPoint(t)))
+                    {
+                        forloopReturn.add(metadata.getEndPoint(t));
+                    }
                 }
             }
             endpoints.addAll(forloopReturn);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Sun Feb  7 02:21:06 2010
@@ -21,6 +21,7 @@
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -46,41 +47,25 @@
 
     public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
-        int startIndex;
-        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
-        boolean bDataCenter = false;
-        boolean bOtherRack = false;
-        int foundCount = 0;
-        List tokens = metadata.sortedTokens();
+        int replicas = DatabaseDescriptor.getReplicationFactor(table);
+        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+        List<Token> tokens = metadata.sortedTokens();
 
         if (tokens.isEmpty())
             return endpoints;
 
-        int index = Collections.binarySearch(tokens, token);
-        if(index < 0)
-        {
-            index = (index + 1) * (-1);
-            if (index >= tokens.size())
-                index = 0;
-        }
-        int totalNodes = tokens.size();
-        // Add the node at the index by default
-        Token primaryToken = (Token) tokens.get(index);
+        Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token);
+        Token primaryToken = iter.next();
         endpoints.add(metadata.getEndPoint(primaryToken));
-        foundCount++;
-        final int replicas = DatabaseDescriptor.getReplicationFactor(table);
-        if (replicas == 1)
-        {
-            return endpoints;
-        }
-        startIndex = (index + 1)%totalNodes;
 
-        for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas; ++count, i = (i + 1) % totalNodes)
+        boolean bDataCenter = false;
+        boolean bOtherRack = false;
+        while (endpoints.size() < replicas && iter.hasNext())
         {
             try
             {
                 // First try to find one in a different data center
-                Token t = (Token) tokens.get(i);
+                Token t = iter.next();
                 if (!((EndPointSnitch)snitch_).isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
                 {
                     // If we have already found something in a diff datacenter no need to find another
@@ -88,7 +73,6 @@
                     {
                         endpoints.add(metadata.getEndPoint(t));
                         bDataCenter = true;
-                        foundCount++;
                     }
                     continue;
                 }
@@ -101,7 +85,6 @@
                     {
                         endpoints.add(metadata.getEndPoint(t));
                         bOtherRack = true;
-                        foundCount++;
                     }
                 }
             }
@@ -111,17 +94,20 @@
             }
 
         }
+
         // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
         // loop through the list and add until we have N nodes.
-        for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas; ++count, i = (i+1)%totalNodes)
+        if (endpoints.size() < replicas)
         {
-            Token t = (Token) tokens.get(i);
-            if (!endpoints.contains(metadata.getEndPoint(t)))
+            iter = TokenMetadata.ringIterator(tokens, token);
+            while (endpoints.size() < replicas && iter.hasNext())
             {
-                endpoints.add(metadata.getEndPoint(t));
-                foundCount++;
+                Token t = iter.next();
+                if (!endpoints.contains(metadata.getEndPoint(t)))
+                    endpoints.add(metadata.getEndPoint(t));
             }
         }
+
         return endpoints;
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Sun Feb  7 02:21:06 2010
@@ -20,6 +20,7 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -41,35 +42,20 @@
 
     public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
-        int startIndex;
-        List<Token> tokenList = new ArrayList<Token>();
-        List tokens = new ArrayList<Token>(metadata.sortedTokens());
-        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(tokenList.size());
+        int replicas = DatabaseDescriptor.getReplicationFactor(table);
+        List<Token> tokens = metadata.sortedTokens();
+        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
 
         if (tokens.isEmpty())
             return endpoints;
 
-        int index = Collections.binarySearch(tokens, token);
-        if (index < 0)
-        {
-            index = (index + 1) * (-1);
-            if (index >= tokens.size())
-                index = 0;
-        }
-        int totalNodes = tokens.size();
         // Add the token at the index by default
-        tokenList.add((Token) tokens.get(index));
-        startIndex = (index + 1) % totalNodes;
-        // If we found N number of nodes we are good. This loop will just exit. Otherwise just
-        // loop through the list and add until we have N nodes.
-        final int replicas = DatabaseDescriptor.getReplicationFactor(table);
-        for (int i = startIndex, count = 1; count < totalNodes && tokenList.size() < replicas; ++count, i = (i + 1) % totalNodes)
+        Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token);
+        while (endpoints.size() < replicas && iter.hasNext())
         {
-            assert !tokenList.contains(tokens.get(i));
-            tokenList.add((Token) tokens.get(i));
+            endpoints.add(metadata.getEndPoint(iter.next()));
         }
-        for (Token t : tokenList)
-            endpoints.add(metadata.getEndPoint(t));
+
         return endpoints;
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Sun Feb  7 02:21:06 2010
@@ -24,6 +24,7 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.collect.*;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.Range;
 
@@ -31,11 +32,6 @@
 
 import org.apache.commons.lang.StringUtils;
 
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.HashMultimap;
-
 public class TokenMetadata
 {
     /* Maintains token to endpoint map of every node in the cluster. */
@@ -405,6 +401,44 @@
         return leavingEndPoints;
     }
 
+    /**
+     * iterator over the Tokens in the given ring, starting with the token for the node owning start
+     * (which does not have to be a Token in the ring)
+     */
+    public static Iterator<Token> ringIterator(final List ring, Token start)
+    {
+        assert ring.size() > 0;
+        int i = Collections.binarySearch(ring, start);
+        if (i < 0)
+        {
+            i = (i + 1) * (-1);
+            if (i >= ring.size())
+            {
+                i = 0;
+            }
+        }
+        final int startIndex = i;
+        return new AbstractIterator<Token>()
+        {
+            int j = startIndex;
+            protected Token computeNext()
+            {
+                if (j < 0)
+                    return endOfData();
+                try
+                {
+                    return (Token) ring.get(j);
+                }
+                finally
+                {
+                    j = (j + 1) % ring.size();
+                    if (j == startIndex)
+                        j = -1;
+                }
+            }
+        };
+    }
+
     /** used by tests */
     public void clearUnsafe()
     {
@@ -487,5 +521,4 @@
 
         return sb.toString();
     }
-
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sun Feb  7 02:21:06 2010
@@ -1108,29 +1108,12 @@
 
     public InetAddress getPrimary(Token token)
     {
-        InetAddress endpoint = FBUtilities.getLocalAddress();
-        List tokens = new ArrayList<Token>(tokenMetadata_.sortedTokens());
+        List tokens = tokenMetadata_.sortedTokens();
         if (tokens.size() > 0)
         {
-            int index = Collections.binarySearch(tokens, token);
-            if (index >= 0)
-            {
-                /*
-                 * retrieve the endpoint based on the token at this index in the
-                 * tokens list
-                 */
-                endpoint = tokenMetadata_.getEndPoint((Token) tokens.get(index));
-            }
-            else
-            {
-                index = (index + 1) * (-1);
-                if (index < tokens.size())
-                    endpoint = tokenMetadata_.getEndPoint((Token) tokens.get(index));
-                else
-                    endpoint = tokenMetadata_.getEndPoint((Token) tokens.get(0));
-            }
+            return tokenMetadata_.getEndPoint(TokenMetadata.ringIterator(tokens, token).next());
         }
-        return endpoint;
+        return FBUtilities.getLocalAddress();
     }
 
     /**