You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/08/12 16:39:01 UTC

svn commit: r984806 - in /cassandra/trunk/src/java/org/apache/cassandra/locator: AbstractEndpointSnitch.java AbstractReplicationStrategy.java DynamicEndpointSnitch.java IEndpointSnitch.java PropertyFileSnitch.java

Author: gdusbabek
Date: Thu Aug 12 14:39:01 2010
New Revision: 984806

URL: http://svn.apache.org/viewvc?rev=984806&view=rev
Log:
move endpoint cache from ARS to AES. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1350

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Thu Aug 12 14:39:01 2010
@@ -19,26 +19,41 @@
 
 package org.apache.cassandra.locator;
 
+import org.apache.cassandra.dht.Token;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 public abstract class AbstractEndpointSnitch implements IEndpointSnitch
 {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractEndpointSnitch.class);
+    
     /* list of subscribers that are notified when cached values from this snitch are invalidated */
     protected List<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
+    
+    private volatile Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
+    
+    public ArrayList<InetAddress> getCachedEndpoints(Token t)
+    {
+        return cachedEndpoints.get(t);
+    }
 
-    public void register(AbstractReplicationStrategy subscriber)
+    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
     {
-        subscribers.add(subscriber);
+        cachedEndpoints.put(t, addr);
     }
 
-    protected void invalidateCachedSnitchValues()
+    public void clearEndpointCache()
     {
-        for (AbstractReplicationStrategy subscriber : subscribers)
-            subscriber.invalidateCachedSnitchValues();
+        logger.debug("clearing cached endpoints");
+        cachedEndpoints.clear();
     }
 
     public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Aug 12 14:39:01 2010
@@ -52,7 +52,6 @@ public abstract class AbstractReplicatio
     public String table;
     private TokenMetadata tokenMetadata;
     public final IEndpointSnitch snitch;
-    private volatile Map<Token, ArrayList<InetAddress>> cachedEndpoints;
     public Map<String, String> configOptions;
 
     AbstractReplicationStrategy(String table, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -62,7 +61,6 @@ public abstract class AbstractReplicatio
         assert tokenMetadata != null;
         this.tokenMetadata = tokenMetadata;
         this.snitch = snitch;
-        cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
         this.tokenMetadata.register(this);
         this.configOptions = configOptions;
         this.table = table;
@@ -80,13 +78,13 @@ public abstract class AbstractReplicatio
     {
         int replicas = getReplicationFactor();
         Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
-        ArrayList<InetAddress> endpoints = cachedEndpoints.get(keyToken);
+        ArrayList<InetAddress> endpoints = snitch.getCachedEndpoints(keyToken);
         if (endpoints == null)
         {
             TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
             keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
             endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
-            cachedEndpoints.put(keyToken, endpoints);
+            snitch.cacheEndpoint(keyToken, endpoints);
         }
 
         // calculateNaturalEndpoints should have checked this already, this is a safety
@@ -218,20 +216,9 @@ public abstract class AbstractReplicatio
         return new QuorumResponseHandler(responseResolver, consistencyLevel, table);
     }
 
-    protected void clearCachedEndpoints()
-    {
-        logger.debug("clearing cached endpoints");
-        cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
-    }
-
     public void invalidateCachedTokenEndpointValues()
     {
-        clearCachedEndpoints();
-    }
-
-    public void invalidateCachedSnitchValues()
-    {
-        clearCachedEndpoints();
+        snitch.clearEndpointCache();
     }
 
     public static AbstractReplicationStrategy createReplicationStrategy(String table,

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Thu Aug 12 14:39:01 2010
@@ -28,16 +28,10 @@ import java.lang.management.ManagementFa
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ResponseVerbHandler;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.AbstractStatsDeque;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.ILatencyPublisher;
-import org.apache.cassandra.locator.ILatencySubscriber;
-import org.apache.cassandra.locator.AbstractEndpointSnitch;
-import org.apache.cassandra.locator.DynamicEndpointSnitchMBean;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -90,6 +84,21 @@ public class DynamicEndpointSnitch exten
         }
     }
 
+    public ArrayList<InetAddress> getCachedEndpoints(Token t)
+    {
+        return subsnitch.getCachedEndpoints(t);
+    }
+
+    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
+    {
+        subsnitch.cacheEndpoint(t, addr);
+    }
+
+    public void clearEndpointCache()
+    {
+        subsnitch.clearEndpointCache();
+    }
+
     public String getRack(InetAddress endpoint)
     {
         return subsnitch.getRack(endpoint);

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Thu Aug 12 14:39:01 2010
@@ -18,7 +18,10 @@
 
 package org.apache.cassandra.locator;
 
+import org.apache.cassandra.dht.Token;
+
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
@@ -51,13 +54,22 @@ public interface IEndpointSnitch
     public List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
 
     /**
-     * register to receive notification when the endpoint snitch has changed the answers it was providing.
-     * @param subscriber the subscriber to notify
+     * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
      */
-    public void register(AbstractReplicationStrategy subscriber);
+    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
 
     /**
-     * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
+     * returns a list of cached endpoints for a given token.
      */
-    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+    public ArrayList<InetAddress> getCachedEndpoints(Token t);
+
+    /**
+     * puts an address in the cache for a given token.
+     */
+    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr);
+
+    /**
+     * clears all cache values.
+     */
+    public void clearEndpointCache();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=984806&r1=984805&r2=984806&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Thu Aug 12 14:39:01 2010
@@ -114,7 +114,7 @@ public class PropertyFileSnitch extends 
     public void reloadConfiguration() throws ConfigurationException
     {
         hostProperties = resourceToProperties(RACK_PROPERTY_FILENAME);
-        invalidateCachedSnitchValues();
+        clearEndpointCache();
     }
 
     public static Properties resourceToProperties(String filename) throws ConfigurationException