You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/07 03:21:07 UTC
svn commit: r907368 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
locator/DatacenterShardStategy.java locator/RackAwareStrategy.java
locator/RackUnawareStrategy.java locator/TokenMetadata.java
service/StorageService.java
Author: jbellis
Date: Sun Feb 7 02:21:06 2010
New Revision: 907368
URL: http://svn.apache.org/viewvc?rev=907368&view=rev
Log:
add TokenMetadata.ringIterator to replace one-offs of the same functionality
patch by jbellis; reviewed by Stu Hood for CASSANDRA-771
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java Sun Feb 7 02:21:06 2010
@@ -142,54 +142,37 @@
for (String dc : dcMap.keySet())
{
- int foundCount = 0;
- ArrayList<InetAddress> forloopReturn = new ArrayList<InetAddress>();
int replicas_ = dcReplicationFactor.get(dc);
- List tokens = dcMap.get(dc);
+ ArrayList<InetAddress> forloopReturn = new ArrayList<InetAddress>(replicas_);
+ List<Token> tokens = dcMap.get(dc);
boolean bOtherRack = false;
boolean doneDataCenterItr;
- int index = Collections.binarySearch(tokens, searchToken);
- if (index < 0)
- {
- index = (index + 1) * (-1);
- if (index >= tokens.size())
- {
- index = 0;
- }
- }
- int totalNodes = tokens.size();
// Add the node at the index by default
- InetAddress primaryHost = metadata.getEndPoint((Token) tokens.get(index));
+ Iterator<Token> iter = TokenMetadata.ringIterator(tokens, searchToken);
+ InetAddress primaryHost = metadata.getEndPoint(iter.next());
forloopReturn.add(primaryHost);
- foundCount++;
- if (replicas_ == 1)
- {
- continue;
- }
- int startIndex = (index + 1) % totalNodes;
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+ while (forloopReturn.size() < replicas_ && iter.hasNext())
{
- InetAddress endPointOfIntrest = metadata.getEndPoint((Token) tokens.get(i));
- if ((replicas_ - 1) > foundCount)
+ Token t = iter.next();
+ InetAddress endPointOfIntrest = metadata.getEndPoint(t);
+ if (forloopReturn.size() < replicas_ - 1)
{
forloopReturn.add(endPointOfIntrest);
- foundCount++;
continue;
}
else
{
doneDataCenterItr = true;
}
-
+
// Now try to find one on a different rack
if (!bOtherRack)
{
if (!((DatacenterEndPointSnitch)snitch_).isOnSameRack(primaryHost, endPointOfIntrest))
{
- forloopReturn.add(metadata.getEndPoint((Token) tokens.get(i)));
+ forloopReturn.add(metadata.getEndPoint(t));
bOtherRack = true;
- foundCount++;
}
}
// If both already found exit loop.
@@ -204,13 +187,16 @@
* exit. Otherwise just loop through the list and add until we
* have N nodes.
*/
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+ if (forloopReturn.size() < replicas_)
{
- Token t = (Token) tokens.get(i);
- if (!forloopReturn.contains(metadata.getEndPoint(t)))
+ iter = TokenMetadata.ringIterator(tokens, searchToken);
+ while (forloopReturn.size() < replicas_ && iter.hasNext())
{
- forloopReturn.add(metadata.getEndPoint(t));
- foundCount++;
+ Token t = iter.next();
+ if (!forloopReturn.contains(metadata.getEndPoint(t)))
+ {
+ forloopReturn.add(metadata.getEndPoint(t));
+ }
}
}
endpoints.addAll(forloopReturn);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Sun Feb 7 02:21:06 2010
@@ -21,6 +21,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -46,41 +47,25 @@
public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
- int startIndex;
- ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
- boolean bDataCenter = false;
- boolean bOtherRack = false;
- int foundCount = 0;
- List tokens = metadata.sortedTokens();
+ int replicas = DatabaseDescriptor.getReplicationFactor(table);
+ ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+ List<Token> tokens = metadata.sortedTokens();
if (tokens.isEmpty())
return endpoints;
- int index = Collections.binarySearch(tokens, token);
- if(index < 0)
- {
- index = (index + 1) * (-1);
- if (index >= tokens.size())
- index = 0;
- }
- int totalNodes = tokens.size();
- // Add the node at the index by default
- Token primaryToken = (Token) tokens.get(index);
+ Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token);
+ Token primaryToken = iter.next();
endpoints.add(metadata.getEndPoint(primaryToken));
- foundCount++;
- final int replicas = DatabaseDescriptor.getReplicationFactor(table);
- if (replicas == 1)
- {
- return endpoints;
- }
- startIndex = (index + 1)%totalNodes;
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas; ++count, i = (i + 1) % totalNodes)
+ boolean bDataCenter = false;
+ boolean bOtherRack = false;
+ while (endpoints.size() < replicas && iter.hasNext())
{
try
{
// First try to find one in a different data center
- Token t = (Token) tokens.get(i);
+ Token t = iter.next();
if (!((EndPointSnitch)snitch_).isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
{
// If we have already found something in a diff datacenter no need to find another
@@ -88,7 +73,6 @@
{
endpoints.add(metadata.getEndPoint(t));
bDataCenter = true;
- foundCount++;
}
continue;
}
@@ -101,7 +85,6 @@
{
endpoints.add(metadata.getEndPoint(t));
bOtherRack = true;
- foundCount++;
}
}
}
@@ -111,17 +94,20 @@
}
}
+
// If we found N number of nodes we are good. This loop wil just exit. Otherwise just
// loop through the list and add until we have N nodes.
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas; ++count, i = (i+1)%totalNodes)
+ if (endpoints.size() < replicas)
{
- Token t = (Token) tokens.get(i);
- if (!endpoints.contains(metadata.getEndPoint(t)))
+ iter = TokenMetadata.ringIterator(tokens, token);
+ while (endpoints.size() < replicas && iter.hasNext())
{
- endpoints.add(metadata.getEndPoint(t));
- foundCount++;
+ Token t = iter.next();
+ if (!endpoints.contains(metadata.getEndPoint(t)))
+ endpoints.add(metadata.getEndPoint(t));
}
}
+
return endpoints;
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Sun Feb 7 02:21:06 2010
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -41,35 +42,20 @@
public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
- int startIndex;
- List<Token> tokenList = new ArrayList<Token>();
- List tokens = new ArrayList<Token>(metadata.sortedTokens());
- ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(tokenList.size());
+ int replicas = DatabaseDescriptor.getReplicationFactor(table);
+ List<Token> tokens = metadata.sortedTokens();
+ ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
if (tokens.isEmpty())
return endpoints;
- int index = Collections.binarySearch(tokens, token);
- if (index < 0)
- {
- index = (index + 1) * (-1);
- if (index >= tokens.size())
- index = 0;
- }
- int totalNodes = tokens.size();
// Add the token at the index by default
- tokenList.add((Token) tokens.get(index));
- startIndex = (index + 1) % totalNodes;
- // If we found N number of nodes we are good. This loop will just exit. Otherwise just
- // loop through the list and add until we have N nodes.
- final int replicas = DatabaseDescriptor.getReplicationFactor(table);
- for (int i = startIndex, count = 1; count < totalNodes && tokenList.size() < replicas; ++count, i = (i + 1) % totalNodes)
+ Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token);
+ while (endpoints.size() < replicas && iter.hasNext())
{
- assert !tokenList.contains(tokens.get(i));
- tokenList.add((Token) tokens.get(i));
+ endpoints.add(metadata.getEndPoint(iter.next()));
}
- for (Token t : tokenList)
- endpoints.add(metadata.getEndPoint(t));
+
return endpoints;
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Sun Feb 7 02:21:06 2010
@@ -24,6 +24,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.collect.*;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.Range;
@@ -31,11 +32,6 @@
import org.apache.commons.lang.StringUtils;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.HashMultimap;
-
public class TokenMetadata
{
/* Maintains token to endpoint map of every node in the cluster. */
@@ -405,6 +401,44 @@
return leavingEndPoints;
}
+ /**
+ * iterator over the Tokens in the given ring, starting with the token for the node owning start
+ * (which does not have to be a Token in the ring)
+ */
+ public static Iterator<Token> ringIterator(final List ring, Token start)
+ {
+ assert ring.size() > 0;
+ int i = Collections.binarySearch(ring, start);
+ if (i < 0)
+ {
+ i = (i + 1) * (-1);
+ if (i >= ring.size())
+ {
+ i = 0;
+ }
+ }
+ final int startIndex = i;
+ return new AbstractIterator<Token>()
+ {
+ int j = startIndex;
+ protected Token computeNext()
+ {
+ if (j < 0)
+ return endOfData();
+ try
+ {
+ return (Token) ring.get(j);
+ }
+ finally
+ {
+ j = (j + 1) % ring.size();
+ if (j == startIndex)
+ j = -1;
+ }
+ }
+ };
+ }
+
/** used by tests */
public void clearUnsafe()
{
@@ -487,5 +521,4 @@
return sb.toString();
}
-
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=907368&r1=907367&r2=907368&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sun Feb 7 02:21:06 2010
@@ -1108,29 +1108,12 @@
public InetAddress getPrimary(Token token)
{
- InetAddress endpoint = FBUtilities.getLocalAddress();
- List tokens = new ArrayList<Token>(tokenMetadata_.sortedTokens());
+ List tokens = tokenMetadata_.sortedTokens();
if (tokens.size() > 0)
{
- int index = Collections.binarySearch(tokens, token);
- if (index >= 0)
- {
- /*
- * retrieve the endpoint based on the token at this index in the
- * tokens list
- */
- endpoint = tokenMetadata_.getEndPoint((Token) tokens.get(index));
- }
- else
- {
- index = (index + 1) * (-1);
- if (index < tokens.size())
- endpoint = tokenMetadata_.getEndPoint((Token) tokens.get(index));
- else
- endpoint = tokenMetadata_.getEndPoint((Token) tokens.get(0));
- }
+ return tokenMetadata_.getEndPoint(TokenMetadata.ringIterator(tokens, token).next());
}
- return endpoint;
+ return FBUtilities.getLocalAddress();
}
/**