You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/21 22:55:42 UTC
svn commit: r1026138 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/locator/
Author: jbellis
Date: Thu Oct 21 20:55:41 2010
New Revision: 1026138
URL: http://svn.apache.org/viewvc?rev=1026138&view=rev
Log:
move endpoint cache from snitch to strategy
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1643
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Oct 21 20:55:41 2010
@@ -52,6 +52,7 @@ dev
* cli support for index queries (CASSANDRA-1635)
* cli support for updating schema memtable settings (CASSANDRA-1634)
* reduce automatically chosen memtable sizes by 50% (CASSANDRA-1641)
+ * move endpoint cache from snitch to strategy (CASSANDRA-1643)
0.7-beta2
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Oct 21 20:55:41 2010
@@ -709,14 +709,6 @@ public class DatabaseDescriptor
return requestSchedulerId;
}
- public static Class<? extends AbstractReplicationStrategy> getReplicaPlacementStrategyClass(String table)
- {
- KSMetaData meta = tables.get(table);
- if (meta == null)
- throw new RuntimeException(table + " not found. Failure to call loadSchemas() perhaps?");
- return meta.strategyClass;
- }
-
public static KSMetaData getKSMetaData(String table)
{
assert table != null;
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Thu Oct 21 20:55:41 2010
@@ -35,27 +35,6 @@ public abstract class AbstractEndpointSn
{
private static final Logger logger = LoggerFactory.getLogger(AbstractEndpointSnitch.class);
- /* list of subscribers that are notified when cached values from this snitch are invalidated */
- protected List<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
-
- private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
-
- public ArrayList<InetAddress> getCachedEndpoints(Token t)
- {
- return cachedEndpoints.get(t);
- }
-
- public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
- {
- cachedEndpoints.put(t, addr);
- }
-
- public void clearEndpointCache()
- {
- logger.debug("clearing cached endpoints");
- cachedEndpoints.clear();
- }
-
public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
public abstract void sortByProximity(InetAddress address, List<InetAddress> addresses);
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Oct 21 20:55:41 2010
@@ -19,29 +19,23 @@
package org.apache.cassandra.locator;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.util.*;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.service.*;
-import org.apache.commons.lang.ObjectUtils;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.*;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
@@ -51,10 +45,10 @@ public abstract class AbstractReplicatio
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
- public String table;
- private TokenMetadata tokenMetadata;
+ public final String table;
+ private final TokenMetadata tokenMetadata;
public final IEndpointSnitch snitch;
- public Map<String, String> configOptions;
+ public final Map<String, String> configOptions;
AbstractReplicationStrategy(String table, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
{
@@ -68,6 +62,24 @@ public abstract class AbstractReplicatio
this.table = table;
}
+ private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
+
+ public ArrayList<InetAddress> getCachedEndpoints(Token t)
+ {
+ return cachedEndpoints.get(t);
+ }
+
+ public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
+ {
+ cachedEndpoints.put(t, addr);
+ }
+
+ public void clearEndpointCache()
+ {
+ logger.debug("clearing cached endpoints");
+ cachedEndpoints.clear();
+ }
+
/**
* get the (possibly cached) endpoints that should store the given Token
* Note that while the endpoints are conceptually a Set (no duplicates will be included),
@@ -79,13 +91,13 @@ public abstract class AbstractReplicatio
public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken) throws IllegalStateException
{
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
- ArrayList<InetAddress> endpoints = snitch.getCachedEndpoints(keyToken);
+ ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
if (endpoints == null)
{
TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
- snitch.cacheEndpoint(keyToken, endpoints);
+ cacheEndpoint(keyToken, endpoints);
// calculateNaturalEndpoints should have checked this already, this is a safety
assert getReplicationFactor() <= endpoints.size() : String.format("endpoints %s generated for RF of %s",
Arrays.toString(endpoints.toArray()),
@@ -220,7 +232,7 @@ public abstract class AbstractReplicatio
public void invalidateCachedTokenEndpointValues()
{
- snitch.clearEndpointCache();
+ clearEndpointCache();
}
public static AbstractReplicationStrategy createReplicationStrategy(String table,
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Thu Oct 21 20:55:41 2010
@@ -86,21 +86,6 @@ public class DynamicEndpointSnitch exten
}
}
- public ArrayList<InetAddress> getCachedEndpoints(Token t)
- {
- return subsnitch.getCachedEndpoints(t);
- }
-
- public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
- {
- subsnitch.cacheEndpoint(t, addr);
- }
-
- public void clearEndpointCache()
- {
- subsnitch.clearEndpointCache();
- }
-
public String getRack(InetAddress endpoint)
{
return subsnitch.getRack(endpoint);
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Thu Oct 21 20:55:41 2010
@@ -57,19 +57,4 @@ public interface IEndpointSnitch
* compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
*/
public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
-
- /**
- * returns a list of cached endpoints for a given token.
- */
- public ArrayList<InetAddress> getCachedEndpoints(Token t);
-
- /**
- * puts an address in the cache for a given token.
- */
- public void cacheEndpoint(Token t, ArrayList<InetAddress> addr);
-
- /**
- * clears all cache values.
- */
- public void clearEndpointCache();
-}
+}
\ No newline at end of file
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Thu Oct 21 20:55:41 2010
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ResourceWatcher;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -152,6 +153,6 @@ public class PropertyFileSnitch extends
logger.debug("loaded network topology {}", FBUtilities.toString(reloadedMap));
endpointMap = reloadedMap;
- clearEndpointCache();
+ StorageService.instance.getTokenMetadata().invalidateCaches();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Thu Oct 21 20:55:41 2010
@@ -121,7 +121,7 @@ public class TokenMetadata
sortedTokens = sortTokens();
}
leavingEndpoints.remove(endpoint);
- fireTokenToEndpointMapChanged();
+ invalidateCaches();
}
finally
{
@@ -197,7 +197,7 @@ public class TokenMetadata
tokenToEndpointMap.inverse().remove(endpoint);
leavingEndpoints.remove(endpoint);
sortedTokens = sortTokens();
- fireTokenToEndpointMapChanged();
+ invalidateCaches();
}
finally
{
@@ -450,7 +450,7 @@ public class TokenMetadata
tokenToEndpointMap.clear();
leavingEndpoints.clear();
pendingRanges.clear();
- fireTokenToEndpointMapChanged();
+ invalidateCaches();
}
public String toString()
@@ -527,7 +527,7 @@ public class TokenMetadata
return sb.toString();
}
- protected void fireTokenToEndpointMapChanged()
+ public void invalidateCaches()
{
for (AbstractReplicationStrategy subscriber : subscribers)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Oct 21 20:55:41 2010
@@ -168,7 +168,7 @@ public class StorageService implements I
"request");
/* We use this interface to determine where replicas need to be placed */
- private Map<String, AbstractReplicationStrategy> replicationStrategies;
+ private final Map<String, AbstractReplicationStrategy> replicationStrategies;
private Set<InetAddress> replicatingNodes;
private InetAddress removingNode;
@@ -252,10 +252,8 @@ public class StorageService implements I
public AbstractReplicationStrategy getReplicationStrategy(String table)
{
AbstractReplicationStrategy ars = replicationStrategies.get(table);
- if (ars == null)
- throw new RuntimeException(String.format("No replica strategy configured for %s", table));
- else
- return ars;
+ assert ars != null: String.format("No replica strategy configured for %s", table);
+ return ars;
}
public void initReplicationStrategy(String table)
Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java?rev=1026138&r1=1026137&r2=1026138&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java Thu Oct 21 20:55:41 2010
@@ -39,19 +39,15 @@ import org.apache.cassandra.service.Stor
public class SimpleStrategyTest extends SchemaLoader
{
@Test
+ public void tryValidTable()
+ {
+ assert StorageService.instance.getReplicationStrategy("Keyspace1") != null;
+ }
+
+ @Test(expected = AssertionError.class)
public void tryBogusTable()
{
- AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy("Keyspace1");
- assertNotNull(rs);
- try
- {
- rs = StorageService.instance.getReplicationStrategy("SomeBogusTableThatDoesntExist");
- throw new AssertionError("SS.createReplicationStrategy() should have thrown a RuntimeException.");
- }
- catch (RuntimeException ex)
- {
- // This exception should be thrown.
- }
+ StorageService.instance.getReplicationStrategy("SomeBogusTableThatDoesntExist");
}
@Test