You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/08/12 16:39:01 UTC
svn commit: r984806 - in
/cassandra/trunk/src/java/org/apache/cassandra/locator:
AbstractEndpointSnitch.java AbstractReplicationStrategy.java
DynamicEndpointSnitch.java IEndpointSnitch.java PropertyFileSnitch.java
Author: gdusbabek
Date: Thu Aug 12 14:39:01 2010
New Revision: 984806
URL: http://svn.apache.org/viewvc?rev=984806&view=rev
Log:
move endpoint cache from ARS to AES. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1350
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Thu Aug 12 14:39:01 2010
@@ -19,26 +19,41 @@
package org.apache.cassandra.locator;
+import org.apache.cassandra.dht.Token;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
public abstract class AbstractEndpointSnitch implements IEndpointSnitch
{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractEndpointSnitch.class);
+
/* list of subscribers that are notified when cached values from this snitch are invalidated */
protected List<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
+
+ private volatile Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
+
+ public ArrayList<InetAddress> getCachedEndpoints(Token t)
+ {
+ return cachedEndpoints.get(t);
+ }
- public void register(AbstractReplicationStrategy subscriber)
+ public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
{
- subscribers.add(subscriber);
+ cachedEndpoints.put(t, addr);
}
- protected void invalidateCachedSnitchValues()
+ public void clearEndpointCache()
{
- for (AbstractReplicationStrategy subscriber : subscribers)
- subscriber.invalidateCachedSnitchValues();
+ logger.debug("clearing cached endpoints");
+ cachedEndpoints.clear();
}
public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Aug 12 14:39:01 2010
@@ -52,7 +52,6 @@ public abstract class AbstractReplicatio
public String table;
private TokenMetadata tokenMetadata;
public final IEndpointSnitch snitch;
- private volatile Map<Token, ArrayList<InetAddress>> cachedEndpoints;
public Map<String, String> configOptions;
AbstractReplicationStrategy(String table, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -62,7 +61,6 @@ public abstract class AbstractReplicatio
assert tokenMetadata != null;
this.tokenMetadata = tokenMetadata;
this.snitch = snitch;
- cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
this.tokenMetadata.register(this);
this.configOptions = configOptions;
this.table = table;
@@ -80,13 +78,13 @@ public abstract class AbstractReplicatio
{
int replicas = getReplicationFactor();
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
- ArrayList<InetAddress> endpoints = cachedEndpoints.get(keyToken);
+ ArrayList<InetAddress> endpoints = snitch.getCachedEndpoints(keyToken);
if (endpoints == null)
{
TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
- cachedEndpoints.put(keyToken, endpoints);
+ snitch.cacheEndpoint(keyToken, endpoints);
}
// calculateNaturalEndpoints should have checked this already, this is a safety
@@ -218,20 +216,9 @@ public abstract class AbstractReplicatio
return new QuorumResponseHandler(responseResolver, consistencyLevel, table);
}
- protected void clearCachedEndpoints()
- {
- logger.debug("clearing cached endpoints");
- cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
- }
-
public void invalidateCachedTokenEndpointValues()
{
- clearCachedEndpoints();
- }
-
- public void invalidateCachedSnitchValues()
- {
- clearCachedEndpoints();
+ snitch.clearEndpointCache();
}
public static AbstractReplicationStrategy createReplicationStrategy(String table,
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Thu Aug 12 14:39:01 2010
@@ -28,16 +28,10 @@ import java.lang.management.ManagementFa
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ResponseVerbHandler;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.AbstractStatsDeque;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.ILatencyPublisher;
-import org.apache.cassandra.locator.ILatencySubscriber;
-import org.apache.cassandra.locator.AbstractEndpointSnitch;
-import org.apache.cassandra.locator.DynamicEndpointSnitchMBean;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -90,6 +84,21 @@ public class DynamicEndpointSnitch exten
}
}
+ public ArrayList<InetAddress> getCachedEndpoints(Token t)
+ {
+ return subsnitch.getCachedEndpoints(t);
+ }
+
+ public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
+ {
+ subsnitch.cacheEndpoint(t, addr);
+ }
+
+ public void clearEndpointCache()
+ {
+ subsnitch.clearEndpointCache();
+ }
+
public String getRack(InetAddress endpoint)
{
return subsnitch.getRack(endpoint);
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Thu Aug 12 14:39:01 2010
@@ -18,7 +18,10 @@
package org.apache.cassandra.locator;
+import org.apache.cassandra.dht.Token;
+
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -51,13 +54,22 @@ public interface IEndpointSnitch
public List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
/**
- * register to receive notification when the endpoint snitch has changed the answers it was providing.
- * @param subscriber the subscriber to notify
+ * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
*/
- public void register(AbstractReplicationStrategy subscriber);
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
/**
- * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
+ * returns a list of cached endpoints for a given token.
*/
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+ public ArrayList<InetAddress> getCachedEndpoints(Token t);
+
+ /**
+ * puts an address in the cache for a given token.
+ */
+ public void cacheEndpoint(Token t, ArrayList<InetAddress> addr);
+
+ /**
+ * clears all cache values.
+ */
+ public void clearEndpointCache();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Thu Aug 12 14:39:01 2010
@@ -114,7 +114,7 @@ public class PropertyFileSnitch extends
public void reloadConfiguration() throws ConfigurationException
{
hostProperties = resourceToProperties(RACK_PROPERTY_FILENAME);
- invalidateCachedSnitchValues();
+ clearEndpointCache();
}
public static Properties resourceToProperties(String filename) throws ConfigurationException