You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/14 23:53:10 UTC
svn commit: r954657 - in /cassandra/trunk:
src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/service/ test/conf/
test/unit/org/apache/cassandra/locator/
Author: jbellis
Date: Mon Jun 14 21:53:09 2010
New Revision: 954657
URL: http://svn.apache.org/viewvc?rev=954657&view=rev
Log:
Fix bootstrap with DSS and add endpoint caching.
patch by mdennis and jbellis for CASSANDRA-1147
Added:
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/conf/cassandra-rack.properties
cassandra/trunk/test/conf/datacenters.properties
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Added: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=954657&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Mon Jun 14 21:53:09 2010
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public abstract class AbstractEndpointSnitch implements IEndpointSnitch
+{
+ /* list of subscribers that are notified when cached values from this snitch are invalidated */
+ protected List<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
+
+ public void register(AbstractReplicationStrategy subscriber)
+ {
+ subscribers.add(subscriber);
+ }
+
+ protected void invalidateCachedSnitchValues()
+ {
+ for (AbstractReplicationStrategy subscriber : subscribers)
+ subscriber.invalidateCachedSnitchValues();
+ }
+
+ public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
+ public abstract List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java Mon Jun 14 21:53:09 2010
@@ -1,4 +1,3 @@
-package org.apache.cassandra.locator;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.cassandra.locator;
*
*/
+package org.apache.cassandra.locator;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -29,7 +29,7 @@ import java.util.*;
* An endpoint snitch tells Cassandra information about network topology that it can use to route
* requests more efficiently.
*/
-public abstract class AbstractRackAwareSnitch implements IEndpointSnitch
+public abstract class AbstractRackAwareSnitch extends AbstractEndpointSnitch
{
/**
* Return the rack for which an endpoint resides in
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Mon Jun 14 21:53:09 2010
@@ -16,57 +16,79 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.cassandra.locator;
import java.net.InetAddress;
import java.util.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.AbstractWriteResponseHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.IResponseResolver;
import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
* A abstract parent for all replication strategies.
*/
public abstract class AbstractReplicationStrategy
{
- protected static final Logger logger_ = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
+ private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
- protected TokenMetadata tokenMetadata_;
- protected final IEndpointSnitch snitch_;
+ private TokenMetadata tokenMetadata;
+ protected final IEndpointSnitch snitch;
+ private final Map<EndpointCacheKey, ArrayList<InetAddress>> cachedEndpoints;
AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch)
{
- tokenMetadata_ = tokenMetadata;
- snitch_ = snitch;
+ this.tokenMetadata = tokenMetadata;
+ this.snitch = snitch;
+ cachedEndpoints = new NonBlockingHashMap<EndpointCacheKey, ArrayList<InetAddress>>();
+ this.tokenMetadata.register(this);
+ if (this.snitch != null)
+ this.snitch.register(this);
}
/**
- * get the endpoints that should store the given Token, for the given table.
+ * get the (possibly cached) endpoints that should store the given Token, for the given table.
* Note that while the endpoints are conceptually a Set (no duplicates will be included),
- * we return a List to avoid an extra allocation when sorting by proximity later.
+ * we return a List to avoid an extra allocation when sorting by proximity later
+ * @param searchToken the token the natural endpoints are requested for
+ * @param table the table the natural endpoints are requested for
+ * @return a copy of the natural endpoints for the given token and table
*/
- public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table);
-
- public ArrayList<InetAddress> getNaturalEndpoints(Token token, String table)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table)
{
- return getNaturalEndpoints(token, tokenMetadata_, table);
+ // TODO creating a iterator object just to get the closest token is wasteful -- we do in multiple places w/ ringIterator
+ Token keyToken = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken).next();
+ EndpointCacheKey cacheKey = new EndpointCacheKey(table, keyToken);
+ ArrayList<InetAddress> endpoints = cachedEndpoints.get(cacheKey);
+ if (endpoints == null)
+ {
+ TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
+ keyToken = TokenMetadata.ringIterator(tokenMetadataClone.sortedTokens(), searchToken).next();
+ cacheKey = new EndpointCacheKey(table, keyToken);
+ endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone, table));
+ cachedEndpoints.put(cacheKey, endpoints);
+ }
+
+ return new ArrayList<InetAddress>(endpoints);
}
+ public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table);
+
public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
Multimap<InetAddress, InetAddress> hintedEndpoints,
ConsistencyLevel consistencyLevel,
@@ -132,12 +154,12 @@ public abstract class AbstractReplicatio
*/
public Collection<InetAddress> getWriteEndpoints(Token token, String table, Collection<InetAddress> naturalEndpoints)
{
- if (tokenMetadata_.getPendingRanges(table).isEmpty())
+ if (tokenMetadata.getPendingRanges(table).isEmpty())
return naturalEndpoints;
List<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints);
- for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges(table).entrySet())
+ for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata.getPendingRanges(table).entrySet())
{
if (entry.getKey().contains(token))
{
@@ -161,7 +183,7 @@ public abstract class AbstractReplicatio
for (Token token : metadata.sortedTokens())
{
Range range = metadata.getPrimaryRangeFor(token);
- for (InetAddress ep : getNaturalEndpoints(token, metadata, table))
+ for (InetAddress ep : calculateNaturalEndpoints(token, metadata, table))
{
map.put(ep, range);
}
@@ -177,7 +199,7 @@ public abstract class AbstractReplicatio
for (Token token : metadata.sortedTokens())
{
Range range = metadata.getPrimaryRangeFor(token);
- for (InetAddress ep : getNaturalEndpoints(token, metadata, table))
+ for (InetAddress ep : calculateNaturalEndpoints(token, metadata, table))
{
map.put(range, ep);
}
@@ -188,7 +210,7 @@ public abstract class AbstractReplicatio
public Multimap<InetAddress, Range> getAddressRanges(String table)
{
- return getAddressRanges(tokenMetadata_, table);
+ return getAddressRanges(tokenMetadata, table);
}
public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress, String table)
@@ -202,4 +224,25 @@ public abstract class AbstractReplicatio
{
return new QuorumResponseHandler(responseResolver, consistencyLevel, table);
}
+
+ protected static class EndpointCacheKey extends Pair<String, Token>
+ {
+ public EndpointCacheKey(String table, Token keyToken) {super(table, keyToken);}
+ }
+
+ protected void clearCachedEndpoints()
+ {
+ logger.debug("clearing cached endpoints");
+ cachedEndpoints.clear();
+ }
+
+ public void invalidateCachedTokenEndpointValues()
+ {
+ clearCachedEndpoints();
+ }
+
+ public void invalidateCachedSnitchValues()
+ {
+ clearCachedEndpoints();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java Mon Jun 14 21:53:09 2010
@@ -1,5 +1,3 @@
-package org.apache.cassandra.locator;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,19 +19,21 @@ package org.apache.cassandra.locator;
*
*/
+package org.apache.cassandra.locator;
import java.net.InetAddress;
import java.util.*;
import java.util.Map.Entry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.collect.Multimap;
import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.utils.ResourceWatcher;
import org.apache.cassandra.service.*;
import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ResourceWatcher;
import org.apache.cassandra.utils.WrappedRunnable;
/**
@@ -52,9 +52,9 @@ import org.apache.cassandra.utils.Wrappe
public class DatacenterShardStrategy extends AbstractReplicationStrategy
{
private static final String DATACENTER_PROPERTY_FILENAME = "datacenters.properties";
- private Map<String, List<Token>> dcTokens;
private AbstractRackAwareSnitch snitch;
- private Map<String, Map<String, Integer>> datacenters = new HashMap<String, Map<String, Integer>>();
+ private volatile Map<String, Map<String, Integer>> datacenters;
+ private static final Logger logger = LoggerFactory.getLogger(DatacenterShardStrategy.class);
public DatacenterShardStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) throws ConfigurationException
{
@@ -72,108 +72,88 @@ public class DatacenterShardStrategy ext
}
};
ResourceWatcher.watch(DATACENTER_PROPERTY_FILENAME, runnable, 60 * 1000);
-
- loadEndpoints(tokenMetadata);
}
- public void reloadConfiguration() throws ConfigurationException
+ public synchronized void reloadConfiguration() throws ConfigurationException
{
Properties props = PropertyFileSnitch.resourceToProperties(DATACENTER_PROPERTY_FILENAME);
- for (Object key : props.keySet())
+ Map<String, Map<String, Integer>> newDatacenters = new HashMap<String, Map<String, Integer>>();
+ for (Entry entry : props.entrySet())
{
- String[] keys = ((String)key).split(":");
- Map<String, Integer> map = datacenters.get(keys[0]);
- if (null == map)
- {
+ String[] keys = ((String)entry.getKey()).split(":");
+ Map<String, Integer> map = newDatacenters.get(keys[0]);
+ if (map == null)
map = new HashMap<String, Integer>();
- }
- map.put(keys[1], Integer.parseInt((String)props.get(key)));
- datacenters.put(keys[0], map);
+ map.put(keys[1], Integer.parseInt((String) entry.getValue()));
+ newDatacenters.put(keys[0], map);
}
- }
-
- private synchronized void loadEndpoints(TokenMetadata metadata) throws ConfigurationException
- {
- String localDC = snitch.getDatacenter(DatabaseDescriptor.getListenAddress());
- if (localDC == null)
- throw new ConfigurationException("Invalid datacenter configuration; couldn't find local host " + FBUtilities.getLocalAddress());
-
- dcTokens = new HashMap<String, List<Token>>();
- for (Token token : metadata.sortedTokens())
- {
- InetAddress endPoint = metadata.getEndpoint(token);
- String dataCenter = snitch.getDatacenter(endPoint);
- // add tokens to dcmap.
- List<Token> lst = dcTokens.get(dataCenter);
- if (lst == null)
+ datacenters = Collections.unmodifiableMap(newDatacenters);
+ logger.info(DATACENTER_PROPERTY_FILENAME + " changed, clearing endpoint cache");
+ clearCachedEndpoints();
+ }
+
+ public Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table)
+ {
+ int totalReplicas = getReplicationfactor(table);
+ Map<String, Integer> remainingReplicas = new HashMap<String, Integer>(datacenters.get(table));
+ Map<String, Set<String>> dcUsedRacks = new HashMap<String, Set<String>>();
+ Set<InetAddress> endpoints = new HashSet<InetAddress>(totalReplicas);
+
+ // first pass: only collect replicas on unique racks
+ for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken);
+ endpoints.size() < totalReplicas && iter.hasNext();)
+ {
+ Token token = iter.next();
+ InetAddress endpoint = tokenMetadata.getEndpoint(token);
+ String datacenter = snitch.getDatacenter(endpoint);
+ int remaining = remainingReplicas.containsKey(datacenter) ? remainingReplicas.get(datacenter) : 0;
+ if (remaining > 0)
{
- lst = new ArrayList<Token>();
- }
- lst.add(token);
- dcTokens.put(dataCenter, lst);
- }
- for (Entry<String, List<Token>> entry : dcTokens.entrySet())
- {
- List<Token> valueList = entry.getValue();
- Collections.sort(valueList);
- dcTokens.put(entry.getKey(), valueList);
- }
-
- // TODO verify that each DC has enough endpoints for the desired RF
- }
-
- public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, TokenMetadata metadata, String table)
- {
- ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
-
- if (metadata.sortedTokens().isEmpty())
- return endpoints;
-
- for (String dc : dcTokens.keySet())
- {
- List<Token> tokens = dcTokens.get(dc);
- Set<String> racks = new HashSet<String>();
- // Add the node at the index by default
- Iterator<Token> iter = TokenMetadata.ringIterator(tokens, searchToken);
- InetAddress initialDCHost = metadata.getEndpoint(iter.next());
- assert initialDCHost != null;
- endpoints.add(initialDCHost);
- racks.add(snitch.getRack(initialDCHost));
-
- // find replicas on unique racks
- int replicas = getReplicationFactor(dc, table);
- int localEndpoints = 1;
- while (localEndpoints < replicas && iter.hasNext())
- {
- Token t = iter.next();
- InetAddress endpoint = metadata.getEndpoint(t);
- if (!racks.contains(snitch.getRack(endpoint)))
+ Set<String> usedRacks = dcUsedRacks.get(datacenter);
+ if (usedRacks == null)
+ {
+ usedRacks = new HashSet<String>();
+ dcUsedRacks.put(datacenter, usedRacks);
+ }
+ String rack = snitch.getRack(endpoint);
+ if (!usedRacks.contains(rack))
{
endpoints.add(endpoint);
- localEndpoints++;
+ usedRacks.add(rack);
+ remainingReplicas.put(datacenter, remaining - 1);
}
}
+ }
- if (localEndpoints == replicas)
+ // 2nd pass: if replica count has not been achieved from unique racks, add nodes from the same racks
+ for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken);
+ endpoints.size() < totalReplicas && iter.hasNext();)
+ {
+ Token token = iter.next();
+ InetAddress endpoint = tokenMetadata.getEndpoint(token);
+ if (endpoints.contains(endpoint))
continue;
- // if not enough unique racks were found, re-loop and add other endpoints
- iter = TokenMetadata.ringIterator(tokens, searchToken);
- iter.next(); // skip the first one since we already know it's used
- while (localEndpoints < replicas && iter.hasNext())
+ String datacenter = snitch.getDatacenter(endpoint);
+ int remaining = remainingReplicas.containsKey(datacenter) ? remainingReplicas.get(datacenter) : 0;
+ if (remaining > 0)
{
- Token t = iter.next();
- if (!endpoints.contains(metadata.getEndpoint(t)))
- {
- localEndpoints++;
- endpoints.add(metadata.getEndpoint(t));
- }
+ endpoints.add(endpoint);
+ remainingReplicas.put(datacenter, remaining - 1);
}
}
return endpoints;
}
+ public int getReplicationfactor(String table)
+ {
+ int total = 0;
+ for (int repFactor : datacenters.get(table).values())
+ total += repFactor;
+ return total;
+ }
+
public int getReplicationFactor(String dc, String table)
{
return datacenters.get(table).get(dc);
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Mon Jun 14 21:53:09 2010
@@ -18,13 +18,9 @@
package org.apache.cassandra.locator;
-import java.net.UnknownHostException;
-
import java.net.InetAddress;
-import java.util.Set;
-import java.util.List;
import java.util.Collection;
-
+import java.util.List;
/**
* This interface helps determine location of node in the data center relative to another node.
@@ -43,5 +39,10 @@ public interface IEndpointSnitch
* This method will sort the <tt>List</tt> by proximity to the given address.
*/
public List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
-}
+ /**
+ * register to receive notification when the endpoint snitch has changed the answers it was providing.
+ * @param subscriber the subscriber to notify
+ */
+ public void register(AbstractReplicationStrategy subscriber);
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Mon Jun 14 21:53:09 2010
@@ -24,23 +24,25 @@ import java.net.InetAddress;
import java.util.Properties;
import java.util.StringTokenizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ResourceWatcher;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Used to determine if two IP's are in the same datacenter or on the same rack.
* <p/>
* Based on a properties file configuration.
*/
-public class PropertyFileSnitch extends AbstractRackAwareSnitch {
+public class PropertyFileSnitch extends AbstractRackAwareSnitch
+{
/**
* A list of properties with keys being host:port and values being datacenter:rack
*/
- private volatile Properties hostProperties = new Properties();
+ private volatile Properties hostProperties;
/**
* The default rack property file to be read.
@@ -112,6 +114,7 @@ public class PropertyFileSnitch extends
public void reloadConfiguration() throws ConfigurationException
{
hostProperties = resourceToProperties(RACK_PROPERTY_FILENAME);
+ invalidateCachedSnitchValues();
}
public static Properties resourceToProperties(String filename) throws ConfigurationException
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Mon Jun 14 21:53:09 2010
@@ -16,16 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.cassandra.locator;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.net.InetAddress;
+import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
-import java.net.InetAddress;
/*
* This Replication Strategy returns the nodes responsible for a given
@@ -43,10 +41,10 @@ public class RackAwareStrategy extends A
throw new IllegalArgumentException(("RackAwareStrategy requires AbstractRackAwareSnitch."));
}
- public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+ public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
int replicas = DatabaseDescriptor.getReplicationFactor(table);
- ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+ Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas);
List<Token> tokens = metadata.sortedTokens();
if (tokens.isEmpty())
@@ -60,7 +58,7 @@ public class RackAwareStrategy extends A
boolean bOtherRack = false;
while (endpoints.size() < replicas && iter.hasNext())
{
- AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch)snitch_;
+ AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch) this.snitch;
// First try to find one in a different data center
Token t = iter.next();
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Jun 14 21:53:09 2010
@@ -16,15 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.cassandra.locator;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.net.InetAddress;
+import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
-import java.net.InetAddress;
/**
* This class returns the nodes responsible for a given
@@ -39,11 +38,11 @@ public class RackUnawareStrategy extends
super(tokenMetadata, snitch);
}
- public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+ public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
int replicas = DatabaseDescriptor.getReplicationFactor(table);
List<Token> tokens = metadata.sortedTokens();
- ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+ Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas);
if (tokens.isEmpty())
return endpoints;
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java Mon Jun 14 21:53:09 2010
@@ -19,18 +19,19 @@
package org.apache.cassandra.locator;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
/**
* A simple endpoint snitch implementation does not sort addresses by
* proximity.
*/
-public class SimpleSnitch implements IEndpointSnitch
+public class SimpleSnitch extends AbstractEndpointSnitch
{
public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
{
- List<InetAddress> list = new ArrayList<InetAddress>(addresses);
- return list;
+ return new ArrayList<InetAddress>(addresses);
}
public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Mon Jun 14 21:53:09 2010
@@ -18,23 +18,23 @@
package org.apache.cassandra.locator;
+import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.google.common.collect.*;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.Range;
-
-import java.net.InetAddress;
-
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.*;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
public class TokenMetadata
{
private static Logger logger = LoggerFactory.getLogger(TokenMetadata.class);
@@ -64,6 +64,9 @@ public class TokenMetadata
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private List<Token> sortedTokens;
+ /* list of subscribers that are notified when the tokenToEndpointMap changed */
+ private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers;
+
public TokenMetadata()
{
this(null);
@@ -78,6 +81,7 @@ public class TokenMetadata
leavingEndpoints = new HashSet<InetAddress>();
pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
sortedTokens = sortTokens();
+ subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
}
private List<Token> sortTokens()
@@ -116,6 +120,7 @@ public class TokenMetadata
sortedTokens = sortTokens();
}
leavingEndpoints.remove(endpoint);
+ fireTokenToEndpointMapChanged();
}
finally
{
@@ -205,6 +210,7 @@ public class TokenMetadata
tokenToEndpointMap.inverse().remove(endpoint);
leavingEndpoints.remove(endpoint);
sortedTokens = sortTokens();
+ fireTokenToEndpointMapChanged();
}
finally
{
@@ -454,6 +460,7 @@ public class TokenMetadata
tokenToEndpointMap.clear();
leavingEndpoints.clear();
pendingRanges.clear();
+ fireTokenToEndpointMapChanged();
}
public String toString()
@@ -529,4 +536,17 @@ public class TokenMetadata
return sb.toString();
}
+
+ protected void fireTokenToEndpointMapChanged()
+ {
+ for (AbstractReplicationStrategy subscriber : subscribers)
+ {
+ subscriber.invalidateCachedTokenEndpointValues();
+ }
+ }
+
+ public void register(AbstractReplicationStrategy subscriber)
+ {
+ subscribers.add(subscriber);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jun 14 21:53:09 2010
@@ -22,20 +22,24 @@ import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.net.InetAddress;
-import javax.management.*;
+import java.util.concurrent.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
-import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
@@ -43,27 +47,25 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.migration.AddKeyspace;
import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.DeletionService;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.io.util.FileUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.log4j.Level;
-import org.apache.commons.lang.StringUtils;
-
-import com.google.common.collect.Multimap;
-import com.google.common.collect.HashMultimap;
/*
* This abstraction contains the token/identifier of this node
@@ -764,8 +766,8 @@ public class StorageService implements I
// all leaving nodes are gone.
for (Range range : affectedRanges)
{
- List<InetAddress> currentEndpoints = strategy.getNaturalEndpoints(range.right, tm, table);
- List<InetAddress> newEndpoints = strategy.getNaturalEndpoints(range.right, allLeftMetadata, table);
+ Set<InetAddress> currentEndpoints = strategy.calculateNaturalEndpoints(range.right, tm, table);
+ Set<InetAddress> newEndpoints = strategy.calculateNaturalEndpoints(range.right, allLeftMetadata, table);
newEndpoints.removeAll(currentEndpoints);
pendingRanges.putAll(range, newEndpoints);
}
@@ -870,11 +872,11 @@ public class StorageService implements I
if (logger_.isDebugEnabled())
logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, ", ") + "]");
- Map<Range, ArrayList<InetAddress>> currentReplicaEndpoints = new HashMap<Range, ArrayList<InetAddress>>();
+ Map<Range, Set<InetAddress>> currentReplicaEndpoints = new HashMap<Range, Set<InetAddress>>();
// Find (for each range) all nodes that store replicas for these ranges as well
for (Range range : ranges)
- currentReplicaEndpoints.put(range, getReplicationStrategy(table).getNaturalEndpoints(range.right, tokenMetadata_, table));
+ currentReplicaEndpoints.put(range, getReplicationStrategy(table).calculateNaturalEndpoints(range.right, tokenMetadata_, table));
TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
@@ -892,7 +894,7 @@ public class StorageService implements I
// range.
for (Range range : ranges)
{
- ArrayList<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).getNaturalEndpoints(range.right, temp, table);
+ Set<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).calculateNaturalEndpoints(range.right, temp, table);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger_.isDebugEnabled())
if (newReplicaEndpoints.isEmpty())
Modified: cassandra/trunk/test/conf/cassandra-rack.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra-rack.properties?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra-rack.properties (original)
+++ cassandra/trunk/test/conf/cassandra-rack.properties Mon Jun 14 21:53:09 2010
@@ -32,5 +32,11 @@
10.21.119.14=DC3:RAC2
10.20.114.15=DC2:RAC2
+127.0.0.1=DC1:RAC1
+127.0.0.2=DC2:RAC2
+127.0.0.3=DC3:RAC3
+127.0.0.4=DC4:RAC4
+127.0.0.5=DC3:RAC3
+
# default for unknown nodes
-default=DC1:r1
\ No newline at end of file
+default=DC1:r1
Modified: cassandra/trunk/test/conf/datacenters.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/datacenters.properties?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/conf/datacenters.properties (original)
+++ cassandra/trunk/test/conf/datacenters.properties Mon Jun 14 21:53:09 2010
@@ -20,4 +20,7 @@
# keyspace\:datacenter=replication factor
Keyspace1\:DC1=3
Keyspace1\:DC2=2
-Keyspace1\:DC3=1
\ No newline at end of file
+Keyspace1\:DC3=1
+Keyspace3\:DC1=3
+Keyspace3\:DC2=2
+Keyspace3\:DC3=1
\ No newline at end of file
Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java Mon Jun 14 21:53:09 2010
@@ -1,3 +1,22 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
package org.apache.cassandra.locator;
import java.io.IOException;
@@ -5,15 +24,13 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
-
import javax.xml.parsers.ParserConfigurationException;
+import org.junit.Test;
+
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.StringToken;
import org.apache.cassandra.dht.Token;
-
-import org.junit.Test;
-
import org.xml.sax.SAXException;
public class DatacenterShardStrategyTest
Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java Mon Jun 14 21:53:09 2010
@@ -16,9 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.cassandra.locator;
-import static org.junit.Assert.assertEquals;
+package org.apache.cassandra.locator;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -27,13 +26,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.BigIntegerToken;
import org.apache.cassandra.dht.Token;
-import org.junit.Before;
-import org.junit.Test;
-
public class RackAwareStrategyTest
{
private List<Token> endpointTokens;
@@ -160,7 +160,7 @@ public class RackAwareStrategyTest
{
for (Token keyToken : keyTokens)
{
- List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyToken, tmd, table);
+ List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyToken, table);
for (int j = 0; j < endpoints.size(); j++)
{
ArrayList<InetAddress> hostsExpected = expectedResults.get(keyToken.toString());
Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Mon Jun 14 21:53:09 2010
@@ -18,28 +18,21 @@
*/
package org.apache.cassandra.locator;
-import static org.junit.Assert.*;
-
-import java.util.Arrays;
-import java.util.List;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.service.StorageServiceAccessor;
-import org.junit.Test;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-import org.apache.cassandra.dht.StringToken;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.service.StorageService;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import org.apache.cassandra.service.StorageServiceAccessor;
public class RackUnawareStrategyTest extends SchemaLoader
{
@@ -71,8 +64,7 @@ public class RackUnawareStrategyTest ext
endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
}
- for (String table : DatabaseDescriptor.getNonSystemTables())
- testGetEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table);
+ verifyGetNaturalEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]));
}
@Test
@@ -88,29 +80,31 @@ public class RackUnawareStrategyTest ext
endpointTokens.add(new StringToken(String.valueOf((char)('a' + i * 2))));
keyTokens.add(partitioner.getToken(String.valueOf((char)('a' + i * 2 + 1)).getBytes()));
}
- for (String table : DatabaseDescriptor.getNonSystemTables())
- testGetEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table);
+ verifyGetNaturalEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]));
}
// given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens,
// make sure that the Strategy picks the right endpoints for the keys.
- private void testGetEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endpointTokens, Token[] keyTokens, String table) throws UnknownHostException
+ private void verifyGetNaturalEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endpointTokens, Token[] keyTokens) throws UnknownHostException
{
- List<InetAddress> hosts = new ArrayList<InetAddress>();
- for (int i = 0; i < endpointTokens.length; i++)
+ for (String table : DatabaseDescriptor.getNonSystemTables())
{
- InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
- tmd.updateNormalToken(endpointTokens[i], ep);
- hosts.add(ep);
- }
+ List<InetAddress> hosts = new ArrayList<InetAddress>();
+ for (int i = 0; i < endpointTokens.length; i++)
+ {
+ InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
+ tmd.updateNormalToken(endpointTokens[i], ep);
+ hosts.add(ep);
+ }
- for (int i = 0; i < keyTokens.length; i++)
- {
- List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i], table);
- assertEquals(DatabaseDescriptor.getReplicationFactor(table), endpoints.size());
- for (int j = 0; j < endpoints.size(); j++)
+ for (int i = 0; i < keyTokens.length; i++)
{
- assertEquals(endpoints.get(j), hosts.get((i + j + 1) % hosts.size()));
+ List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i], table);
+ assertEquals(DatabaseDescriptor.getReplicationFactor(table), endpoints.size());
+ List<InetAddress> correctEndpoints = new ArrayList<InetAddress>();
+ for (int j = 0; j < endpoints.size(); j++)
+ correctEndpoints.add(hosts.get((i + j + 1) % hosts.size()));
+ assertEquals(new HashSet<InetAddress>(correctEndpoints), new HashSet<InetAddress>(endpoints));
}
}
}
Added: cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=954657&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Mon Jun 14 21:53:09 2010
@@ -0,0 +1,156 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.lang.reflect.Constructor;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.dht.BigIntegerToken;
+import org.apache.cassandra.dht.Token;
+
+public class ReplicationStrategyEndpointCacheTest extends SchemaLoader
+{
+ private TokenMetadata tmd;
+ private Token searchToken;
+ private AbstractReplicationStrategy strategy;
+
+ public void setup(Class stratClass) throws Exception
+ {
+ tmd = new TokenMetadata();
+ searchToken = new BigIntegerToken(String.valueOf(15));
+ Constructor constructor = stratClass.getConstructor(TokenMetadata.class, IEndpointSnitch.class);
+ strategy = (AbstractReplicationStrategy) constructor.newInstance(tmd, new PropertyFileSnitch());
+
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2"));
+ }
+
+ @Test
+ public void testEndpointsWereCached() throws Exception
+ {
+ runEndpointsWereCachedTest(FakeRackUnawareStrategy.class);
+ runEndpointsWereCachedTest(FakeRackAwareStrategy.class);
+ runEndpointsWereCachedTest(FakeDatacenterShardStrategy.class);
+ }
+
+ public void runEndpointsWereCachedTest(Class stratClass) throws Exception
+ {
+ setup(stratClass);
+ assert strategy.getNaturalEndpoints(searchToken, "Keyspace3").equals(strategy.getNaturalEndpoints(searchToken, "Keyspace3"));
+ }
+
+ @Test
+ public void testCacheRespectsTokenChanges() throws Exception
+ {
+ runCacheRespectsTokenChangesTest(RackUnawareStrategy.class);
+ runCacheRespectsTokenChangesTest(RackAwareStrategy.class);
+ runCacheRespectsTokenChangesTest(DatacenterShardStrategy.class);
+ }
+
+ public void runCacheRespectsTokenChangesTest(Class stratClass) throws Exception
+ {
+ // TODO DSS is asked to provide a total of 6 replicas, but we never give it 6 endpoints.
+ // thus we are testing undefined behavior, at best.
+ setup(stratClass);
+ ArrayList<InetAddress> endpoints;
+
+ endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+ assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+
+ // test token addition
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3"));
+ endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+ assert endpoints.size() == 3 : StringUtils.join(endpoints, ",");
+
+ // test token removal
+ tmd.removeEndpoint(InetAddress.getByName("127.0.0.2"));
+ endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+ assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+
+ // test token change
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.5"));
+ endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3");
+ assert endpoints.size() == 2 : StringUtils.join(endpoints, ",");
+ assert endpoints.contains(InetAddress.getByName("127.0.0.5"));
+ assert !endpoints.contains(InetAddress.getByName("127.0.0.3"));
+ }
+
+ protected static class FakeRackUnawareStrategy extends RackUnawareStrategy
+ {
+ private boolean called = false;
+
+ public FakeRackUnawareStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch)
+ {
+ super(tokenMetadata, snitch);
+ }
+
+ @Override
+ public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+ {
+ assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+ called = true;
+ return super.calculateNaturalEndpoints(token, metadata, table);
+ }
+ }
+
+ protected static class FakeRackAwareStrategy extends RackAwareStrategy
+ {
+ private boolean called = false;
+
+ public FakeRackAwareStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch)
+ {
+ super(tokenMetadata, snitch);
+ }
+
+ @Override
+ public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+ {
+ assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+ called = true;
+ return super.calculateNaturalEndpoints(token, metadata, table);
+ }
+ }
+
+ protected static class FakeDatacenterShardStrategy extends DatacenterShardStrategy
+ {
+ private boolean called = false;
+
+ public FakeDatacenterShardStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) throws ConfigurationException
+ {
+ super(tokenMetadata, snitch);
+ }
+
+ @Override
+ public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table)
+ {
+ assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+ called = true;
+ return super.calculateNaturalEndpoints(token, metadata, table);
+ }
+ }
+}