You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/21 22:55:42 UTC

svn commit: r1026138 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/locator/

Author: jbellis
Date: Thu Oct 21 20:55:41 2010
New Revision: 1026138

URL: http://svn.apache.org/viewvc?rev=1026138&view=rev
Log:
move endpoint cache from snitch to strategy
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1643

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Oct 21 20:55:41 2010
@@ -52,6 +52,7 @@ dev
  * cli support for index queries (CASSANDRA-1635)
  * cli support for updating schema memtable settings (CASSANDRA-1634)
  * reduce automatically chosen memtable sizes by 50% (CASSANDRA-1641)
+ * move endpoint cache from snitch to strategy (CASSANDRA-1643)
 
 
 0.7-beta2

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Oct 21 20:55:41 2010
@@ -709,14 +709,6 @@ public class    DatabaseDescriptor
         return requestSchedulerId;
     }
 
-    public static Class<? extends AbstractReplicationStrategy> getReplicaPlacementStrategyClass(String table)
-    {
-    	KSMetaData meta = tables.get(table);
-    	if (meta == null)
-            throw new RuntimeException(table + " not found. Failure to call loadSchemas() perhaps?");
-        return meta.strategyClass;
-    }
-
     public static KSMetaData getKSMetaData(String table)
     {
         assert table != null;

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Thu Oct 21 20:55:41 2010
@@ -35,27 +35,6 @@ public abstract class AbstractEndpointSn
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractEndpointSnitch.class);
     
-    /* list of subscribers that are notified when cached values from this snitch are invalidated */
-    protected List<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
-    
-    private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
-    
-    public ArrayList<InetAddress> getCachedEndpoints(Token t)
-    {
-        return cachedEndpoints.get(t);
-    }
-
-    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
-    {
-        cachedEndpoints.put(t, addr);
-    }
-
-    public void clearEndpointCache()
-    {
-        logger.debug("clearing cached endpoints");
-        cachedEndpoints.clear();
-    }
-
     public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
     public abstract void sortByProximity(InetAddress address, List<InetAddress> addresses);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Oct 21 20:55:41 2010
@@ -19,29 +19,23 @@
 
 package org.apache.cassandra.locator;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.*;
 
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.service.*;
-import org.apache.commons.lang.ObjectUtils;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.hadoop.util.StringUtils;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
@@ -51,10 +45,10 @@ public abstract class AbstractReplicatio
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
 
-    public String table;
-    private TokenMetadata tokenMetadata;
+    public final String table;
+    private final TokenMetadata tokenMetadata;
     public final IEndpointSnitch snitch;
-    public Map<String, String> configOptions;
+    public final Map<String, String> configOptions;
 
     AbstractReplicationStrategy(String table, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
     {
@@ -68,6 +62,24 @@ public abstract class AbstractReplicatio
         this.table = table;
     }
 
+    private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
+
+    public ArrayList<InetAddress> getCachedEndpoints(Token t)
+    {
+        return cachedEndpoints.get(t);
+    }
+
+    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
+    {
+        cachedEndpoints.put(t, addr);
+    }
+
+    public void clearEndpointCache()
+    {
+        logger.debug("clearing cached endpoints");
+        cachedEndpoints.clear();
+    }
+
     /**
      * get the (possibly cached) endpoints that should store the given Token
      * Note that while the endpoints are conceptually a Set (no duplicates will be included),
@@ -79,13 +91,13 @@ public abstract class AbstractReplicatio
     public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken) throws IllegalStateException
     {
         Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
-        ArrayList<InetAddress> endpoints = snitch.getCachedEndpoints(keyToken);
+        ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
         if (endpoints == null)
         {
             TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
             keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
             endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
-            snitch.cacheEndpoint(keyToken, endpoints);
+            cacheEndpoint(keyToken, endpoints);
             // calculateNaturalEndpoints should have checked this already, this is a safety
             assert getReplicationFactor() <= endpoints.size() : String.format("endpoints %s generated for RF of %s",
                                                                               Arrays.toString(endpoints.toArray()),
@@ -220,7 +232,7 @@ public abstract class AbstractReplicatio
 
     public void invalidateCachedTokenEndpointValues()
     {
-        snitch.clearEndpointCache();
+        clearEndpointCache();
     }
 
     public static AbstractReplicationStrategy createReplicationStrategy(String table,

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Thu Oct 21 20:55:41 2010
@@ -86,21 +86,6 @@ public class DynamicEndpointSnitch exten
         }
     }
 
-    public ArrayList<InetAddress> getCachedEndpoints(Token t)
-    {
-        return subsnitch.getCachedEndpoints(t);
-    }
-
-    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
-    {
-        subsnitch.cacheEndpoint(t, addr);
-    }
-
-    public void clearEndpointCache()
-    {
-        subsnitch.clearEndpointCache();
-    }
-
     public String getRack(InetAddress endpoint)
     {
         return subsnitch.getRack(endpoint);

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Thu Oct 21 20:55:41 2010
@@ -57,19 +57,4 @@ public interface IEndpointSnitch
      * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
      */
     public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
-
-    /**
-     * returns a list of cached endpoints for a given token.
-     */
-    public ArrayList<InetAddress> getCachedEndpoints(Token t);
-
-    /**
-     * puts an address in the cache for a given token.
-     */
-    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr);
-
-    /**
-     * clears all cache values.
-     */
-    public void clearEndpointCache();
-}
+}
\ No newline at end of file

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Thu Oct 21 20:55:41 2010
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ResourceWatcher;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -152,6 +153,6 @@ public class PropertyFileSnitch extends 
 
         logger.debug("loaded network topology {}", FBUtilities.toString(reloadedMap));
         endpointMap = reloadedMap;
-        clearEndpointCache();
+        StorageService.instance.getTokenMetadata().invalidateCaches();
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Thu Oct 21 20:55:41 2010
@@ -121,7 +121,7 @@ public class TokenMetadata
                 sortedTokens = sortTokens();
             }
             leavingEndpoints.remove(endpoint);
-            fireTokenToEndpointMapChanged();
+            invalidateCaches();
         }
         finally
         {
@@ -197,7 +197,7 @@ public class TokenMetadata
             tokenToEndpointMap.inverse().remove(endpoint);
             leavingEndpoints.remove(endpoint);
             sortedTokens = sortTokens();
-            fireTokenToEndpointMapChanged();
+            invalidateCaches();
         }
         finally
         {
@@ -450,7 +450,7 @@ public class TokenMetadata
         tokenToEndpointMap.clear();
         leavingEndpoints.clear();
         pendingRanges.clear();
-        fireTokenToEndpointMapChanged();
+        invalidateCaches();
     }
 
     public String toString()
@@ -527,7 +527,7 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    protected void fireTokenToEndpointMapChanged()
+    public void invalidateCaches()
     {
         for (AbstractReplicationStrategy subscriber : subscribers)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Oct 21 20:55:41 2010
@@ -168,7 +168,7 @@ public class StorageService implements I
                                                                                    "request");
 
     /* We use this interface to determine where replicas need to be placed */
-    private Map<String, AbstractReplicationStrategy> replicationStrategies;
+    private final Map<String, AbstractReplicationStrategy> replicationStrategies;
 
     private Set<InetAddress> replicatingNodes;
     private InetAddress removingNode;
@@ -252,10 +252,8 @@ public class StorageService implements I
     public AbstractReplicationStrategy getReplicationStrategy(String table)
     {
         AbstractReplicationStrategy ars = replicationStrategies.get(table);
-        if (ars == null)
-            throw new RuntimeException(String.format("No replica strategy configured for %s", table));
-        else
-            return ars;
+        assert ars != null: String.format("No replica strategy configured for %s", table);
+        return ars;
     }
     
     public void initReplicationStrategy(String table)

Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java Thu Oct 21 20:55:41 2010
@@ -39,19 +39,15 @@ import org.apache.cassandra.service.Stor
 public class SimpleStrategyTest extends SchemaLoader
 {
     @Test
+    public void tryValidTable()
+    {
+        assert StorageService.instance.getReplicationStrategy("Keyspace1") != null;
+    }
+
+    @Test(expected = AssertionError.class)
     public void tryBogusTable()
     {
-        AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy("Keyspace1");
-        assertNotNull(rs);
-        try
-        {
-            rs = StorageService.instance.getReplicationStrategy("SomeBogusTableThatDoesntExist");
-            throw new AssertionError("SS.createReplicationStrategy() should have thrown a RuntimeException.");
-        }
-        catch (RuntimeException ex)
-        {
-            // This exception should be thrown.
-        }
+        StorageService.instance.getReplicationStrategy("SomeBogusTableThatDoesntExist");
     }
 
     @Test