You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/02/04 16:21:33 UTC
svn commit: r906521 [1/3] - in /incubator/cassandra/trunk: ./ conf/
contrib/circuit/src/org/apache/cassandra/contrib/circuit/
src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/ src/java/org/apa...
Author: gdusbabek
Date: Thu Feb 4 15:21:31 2010
New Revision: 906521
URL: http://svn.apache.org/viewvc?rev=906521&view=rev
Log:
add per-keyspace replication factor and replication strategy. Patch by Gary Dusbabek, reviewed by Jaakko Laine. CASSANDRA-620
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
incubator/cassandra/trunk/test/conf/storage-conf.xml
incubator/cassandra/trunk/test/system/test_server.py
incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Thu Feb 4 15:21:31 2010
@@ -19,6 +19,7 @@
* report latency and cache hit rate statistics with lifetime totals
instead of average over the last minute (CASSANDRA-702)
* support get_range_slice for RandomPartitioner (CASSANDRA-745)
+ * per-keyspace replication factory and replication strategy (CASSANDRA-620)
0.5.1
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Thu Feb 4 15:21:31 2010
@@ -111,6 +111,30 @@
RowsCached="1000"
KeysCachedFraction="0"
Comment="A column family with supercolumns, whose column and subcolumn names are UTF8 strings"/>
+
+ <!--
+ ~ Strategy: Setting this to the class that implements
+ ~ IReplicaPlacementStrategy will change the way the node picker works.
+ ~ Out of the box, Cassandra provides
+ ~ org.apache.cassandra.locator.RackUnawareStrategy and
+ ~ org.apache.cassandra.locator.RackAwareStrategy (place one replica in
+ ~ a different datacenter, and the others on different racks in the same
+ ~ one.)
+ -->
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+
+ <!-- Number of replicas of the data -->
+ <ReplicationFactor>1</ReplicationFactor>
+
+ <!--
+ ~ EndPointSnitch: Setting this to the class that implements
+ ~ AbstractEndpointSnitch, which lets Cassandra know enough
+ ~ about your network topology to route requests efficiently.
+ ~ Out of the box, Cassandra provides org.apache.cassandra.locator.EndPointSnitch,
+ ~ and PropertyFileEndPointSnitch is available in contrib/.
+ -->
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+
</Keyspace>
</Keyspaces>
@@ -157,29 +181,6 @@
<InitialToken></InitialToken>
<!--
- ~ EndPointSnitch: Setting this to the class that implements
- ~ AbstractEndpointSnitch, which lets Cassandra know enough
- ~ about your network topology to route requests efficiently.
- ~ Out of the box, Cassandra provides org.apache.cassandra.locator.EndPointSnitch,
- ~ and PropertyFileEndPointSnitch is available in contrib/.
- -->
- <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
-
- <!--
- ~ Strategy: Setting this to the class that implements
- ~ IReplicaPlacementStrategy will change the way the node picker works.
- ~ Out of the box, Cassandra provides
- ~ org.apache.cassandra.locator.RackUnawareStrategy and
- ~ org.apache.cassandra.locator.RackAwareStrategy (place one replica in
- ~ a different datacenter, and the others on different racks in the same
- ~ one.)
- -->
- <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
-
- <!-- Number of replicas of the data -->
- <ReplicationFactor>1</ReplicationFactor>
-
- <!--
~ Directories: Specify where Cassandra should store different data on
~ disk. Keep the data disks and the CommitLog disks separate for best
~ performance
Modified: incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java (original)
+++ incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java Thu Feb 4 15:21:31 2010
@@ -101,7 +101,7 @@
"Invalid ObjectName? Please report this as a bug.", e);
}
- Map<Range, List<String>> rangeMap = ssProxy.getRangeToEndPointMap();
+ Map<Range, List<String>> rangeMap = ssProxy.getRangeToEndPointMap(null);
List<Range> ranges = new ArrayList<Range>(rangeMap.keySet());
Collections.sort(ranges);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Thu Feb 4 15:21:31 2010
@@ -48,9 +48,9 @@
final private static Logger logger_ = Logger.getLogger(RingCache.class);
private Set<String> seeds_ = new HashSet<String>();
- final private int port_=DatabaseDescriptor.getThriftPort();
- private volatile AbstractReplicationStrategy nodePicker_;
+ final private int port_= DatabaseDescriptor.getThriftPort();
final private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
+ private TokenMetadata tokenMetadata;
public RingCache()
{
@@ -89,8 +89,7 @@
}
}
- TokenMetadata tokenMetadata = new TokenMetadata(tokenEndpointMap);
- nodePicker_ = StorageService.getReplicationStrategy(tokenMetadata);
+ tokenMetadata = new TokenMetadata(tokenEndpointMap);
break;
}
@@ -102,8 +101,11 @@
}
}
- public List<InetAddress> getEndPoint(String key)
+ public List<InetAddress> getEndPoint(String table, String key)
{
- return nodePicker_.getNaturalEndpoints(partitioner_.getToken(key));
+ if (tokenMetadata == null)
+ throw new RuntimeException("Must refresh endpoints before looking up a key.");
+ AbstractReplicationStrategy strat = StorageService.getReplicationStrategy(tokenMetadata, table);
+ return strat.getNaturalEndpoints(partitioner_.getToken(key), table);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Feb 4 15:21:31 2010
@@ -69,7 +69,6 @@
private static InetAddress listenAddress_; // leave null so we can fall through to getLocalHost
private static InetAddress thriftAddress_;
private static String clusterName_ = "Test";
- private static int replicationFactor_ = 3;
private static long rpcTimeoutInMillis_ = 2000;
private static Set<InetAddress> seeds_ = new HashSet<InetAddress>();
/* Keeps the list of data file directories */
@@ -96,12 +95,17 @@
* corresponding meta data for that column family.
*/
private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
+
+ // map tables to replication strategies.
+ private static Map<String, Class<AbstractReplicationStrategy>> replicationStrategyClasses_;
+
+ // map tables to replication factors.
+ private static Map<String, Integer> replicationFactors_;
+
/* Hashing strategy Random or OPHF */
private static IPartitioner partitioner_;
- private static IEndPointSnitch endPointSnitch_;
-
- private static Class<AbstractReplicationStrategy> replicaPlacementStrategyClass_;
+ private static Map<String, IEndPointSnitch> endPointSnitches_;
/* if the size of columns or super-columns are more than this, indexing will kick in */
private static int columnIndexSizeInKB_;
@@ -256,22 +260,6 @@
throw new ConfigurationException("Invalid partitioner class " + partitionerClassName);
}
- /* end point snitch */
- String endPointSnitchClassName = xmlUtils.getNodeValue("/Storage/EndPointSnitch");
- if (endPointSnitchClassName == null)
- {
- throw new ConfigurationException("Missing endpointsnitch directive /Storage/EndPointSnitch");
- }
- try
- {
- Class cls = Class.forName(endPointSnitchClassName);
- endPointSnitch_ = (IEndPointSnitch) cls.getConstructor().newInstance();
- }
- catch (ClassNotFoundException e)
- {
- throw new ConfigurationException("Invalid endpointsnitch class " + endPointSnitchClassName);
- }
-
/* JobTracker address */
jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
@@ -284,11 +272,6 @@
initialToken_ = xmlUtils.getNodeValue("/Storage/InitialToken");
- /* Data replication factor */
- String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
- if ( replicationFactor != null )
- replicationFactor_ = Integer.parseInt(replicationFactor);
-
/* RPC Timeout */
String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
if ( rpcTimeoutInMillis != null )
@@ -457,21 +440,9 @@
CommitLog.setSegmentSize(Integer.parseInt(value) * 1024 * 1024);
tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
-
- /* See which replica placement strategy to use */
- String replicaPlacementStrategyClassName = xmlUtils.getNodeValue("/Storage/ReplicaPlacementStrategy");
- if (replicaPlacementStrategyClassName == null)
- {
- throw new ConfigurationException("Missing replicaplacementstrategy directive /Storage/ReplicaPlacementStrategy");
- }
- try
- {
- replicaPlacementStrategyClass_ = (Class<AbstractReplicationStrategy>) Class.forName(replicaPlacementStrategyClassName);
- }
- catch (ClassNotFoundException e)
- {
- throw new ConfigurationException("Invalid replicaplacementstrategy class " + replicaPlacementStrategyClassName);
- }
+ replicationFactors_ = new HashMap<String, Integer>();
+ replicationStrategyClasses_ = new HashMap<String, Class<AbstractReplicationStrategy>>();
+ endPointSnitches_ = new HashMap<String, IEndPointSnitch>();
/* Read the table related stuff from config */
NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Keyspaces/Keyspace");
@@ -493,6 +464,45 @@
tables_.add(tName);
tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
+ /* See which replica placement strategy to use */
+ String replicaPlacementStrategyClassName = xmlUtils.getNodeValue("/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/ReplicaPlacementStrategy");
+ if (replicaPlacementStrategyClassName == null)
+ {
+ throw new ConfigurationException("Missing replicaplacementstrategy directive for " + tName);
+ }
+ try
+ {
+ Class cls = (Class<AbstractReplicationStrategy>) Class.forName(replicaPlacementStrategyClassName);
+ replicationStrategyClasses_.put(tName, cls);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid replicaplacementstrategy class " + replicaPlacementStrategyClassName);
+ }
+
+ /* Data replication factor */
+ String replicationFactor = xmlUtils.getNodeValue("/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/ReplicationFactor");
+ if (replicationFactor == null)
+ throw new ConfigurationException("Missing replicationfactor directory for keyspace " + tName);
+ else
+ replicationFactors_.put(tName, Integer.parseInt(replicationFactor));
+
+ /* end point snitch */
+ String endPointSnitchClassName = xmlUtils.getNodeValue("/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/EndPointSnitch");
+ if (endPointSnitchClassName == null)
+ {
+ throw new ConfigurationException("Missing endpointsnitch directive for keyspace " + tName);
+ }
+ try
+ {
+ Class cls = Class.forName(endPointSnitchClassName);
+ endPointSnitches_.put(tName, (IEndPointSnitch)cls.getConstructor().newInstance());
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid endpointsnitch class " + endPointSnitchClassName);
+ }
+
String xqlTable = "/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/";
NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
@@ -729,14 +739,14 @@
return partitioner_;
}
- public static IEndPointSnitch getEndPointSnitch()
+ public static IEndPointSnitch getEndPointSnitch(String table)
{
- return endPointSnitch_;
+ return endPointSnitches_.get(table);
}
- public static Class<AbstractReplicationStrategy> getReplicaPlacementStrategyClass()
+ public static Class<AbstractReplicationStrategy> getReplicaPlacementStrategyClass(String table)
{
- return replicaPlacementStrategyClass_;
+ return replicationStrategyClasses_.get(table);
}
public static String getJobTrackerAddress()
@@ -851,14 +861,14 @@
return thriftPort_;
}
- public static int getReplicationFactor()
+ public static int getReplicationFactor(String table)
{
- return replicationFactor_;
+ return replicationFactors_.get(table);
}
- public static int getQuorum()
+ public static int getQuorum(String table)
{
- return (replicationFactor_ / 2) + 1;
+ return (replicationFactors_.get(table) / 2) + 1;
}
public static long getRpcTimeout()
@@ -1079,8 +1089,9 @@
/**
* For testing purposes.
*/
- static void setReplicationFactorUnsafe(int factor)
+ static void setReplicationFactorUnsafe(String table, int factor)
{
- replicationFactor_ = factor;
+ replicationFactors_.remove(table);
+ replicationFactors_.put(table, factor);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Thu Feb 4 15:21:31 2010
@@ -433,7 +433,7 @@
private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
{
Collection<SSTableReader> originalSSTables = cfs.getSSTables();
- List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, StorageService.instance.getLocalRanges(), null);
+ List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name), null);
if (!sstables.isEmpty())
{
cfs.replaceCompactedSSTables(originalSSTables, sstables);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Feb 4 15:21:31 2010
@@ -124,7 +124,7 @@
rm.add(cf);
}
Message message = rm.makeRowMutationMessage();
- WriteResponseHandler responseHandler = new WriteResponseHandler(1);
+ WriteResponseHandler responseHandler = new WriteResponseHandler(1, tableName);
MessagingService.instance.sendRR(message, new InetAddress[] { endPoint }, responseHandler);
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu Feb 4 15:21:31 2010
@@ -106,7 +106,7 @@
/* Do read repair if header of the message says so */
if (message.getHeader(ReadCommand.DO_REPAIR) != null)
{
- List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.key);
+ List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove(FBUtilities.getLocalAddress());
if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Thu Feb 4 15:21:31 2010
@@ -24,6 +24,7 @@
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
+ import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
@@ -51,14 +52,12 @@
/* tokens of the nodes being bootstrapped. */
protected final Token token;
protected final TokenMetadata tokenMetadata;
- private final AbstractReplicationStrategy replicationStrategy;
- public BootStrapper(AbstractReplicationStrategy rs, InetAddress address, Token token, TokenMetadata tmd)
+ public BootStrapper(InetAddress address, Token token, TokenMetadata tmd)
{
assert address != null;
assert token != null;
- replicationStrategy = rs;
this.address = address;
this.token = token;
tokenMetadata = tmd;
@@ -70,16 +69,20 @@
{
public void run()
{
- Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources();
if (logger.isDebugEnabled())
logger.debug("Beginning bootstrap process");
- /* Send messages to respective folks to stream data over to me */
- for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
+ for (String table : DatabaseDescriptor.getNonSystemTables())
{
- InetAddress source = entry.getKey();
- for (String table : DatabaseDescriptor.getNonSystemTables())
+ Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources(table);
+ /* Send messages to respective folks to stream data over to me */
+ for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
+ {
+ InetAddress source = entry.getKey();
StorageService.instance.addBootstrapSource(source, table);
- StreamIn.requestRanges(source, entry.getValue());
+ if (logger.isDebugEnabled())
+ logger.debug("Requesting from " + source + " ranges " + StringUtils.join(entry.getValue(), ", "));
+ StreamIn.requestRanges(source, table, entry.getValue());
+ }
}
}
}, "Boostrap requester").start();
@@ -144,20 +147,21 @@
}
/** get potential sources for each range, ordered by proximity (as determined by EndPointSnitch) */
- Multimap<Range, InetAddress> getRangesWithSources()
+ Multimap<Range, InetAddress> getRangesWithSources(String table)
{
assert tokenMetadata.sortedTokens().size() > 0;
- Collection<Range> myRanges = replicationStrategy.getPendingAddressRanges(tokenMetadata, token, address);
+ final AbstractReplicationStrategy strat = StorageService.instance.getReplicationStrategy(table);
+ Collection<Range> myRanges = strat.getPendingAddressRanges(tokenMetadata, token, address, table);
Multimap<Range, InetAddress> myRangeAddresses = ArrayListMultimap.create();
- Multimap<Range, InetAddress> rangeAddresses = replicationStrategy.getRangeAddresses(tokenMetadata);
+ Multimap<Range, InetAddress> rangeAddresses = strat.getRangeAddresses(tokenMetadata, table);
for (Range range : rangeAddresses.keySet())
{
for (Range myRange : myRanges)
{
if (range.contains(myRange))
{
- List<InetAddress> preferred = DatabaseDescriptor.getEndPointSnitch().getSortedListByProximity(address, rangeAddresses.get(range));
+ List<InetAddress> preferred = DatabaseDescriptor.getEndPointSnitch(table).getSortedListByProximity(address, rangeAddresses.get(range));
myRangeAddresses.putAll(myRange, preferred);
break;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Feb 4 15:21:31 2010
@@ -21,6 +21,7 @@
import java.net.InetAddress;
import java.util.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.log4j.Logger;
import com.google.common.collect.HashMultimap;
@@ -43,25 +44,25 @@
{
protected static final Logger logger_ = Logger.getLogger(AbstractReplicationStrategy.class);
- protected TokenMetadata tokenMetadata_;
- protected int replicas_;
+ private TokenMetadata tokenMetadata_;
+ protected final IEndPointSnitch snitch_;
- AbstractReplicationStrategy(TokenMetadata tokenMetadata, int replicas)
+ AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndPointSnitch snitch)
{
tokenMetadata_ = tokenMetadata;
- replicas_ = replicas;
+ snitch_ = snitch;
}
- public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata);
+ public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table);
- public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level)
+ public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
{
- return new WriteResponseHandler(blockFor);
+ return new WriteResponseHandler(blockFor, table);
}
- public ArrayList<InetAddress> getNaturalEndpoints(Token token)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token token, String table)
{
- return getNaturalEndpoints(token, tokenMetadata_);
+ return getNaturalEndpoints(token, tokenMetadata_, table);
}
/*
@@ -69,9 +70,9 @@
* on which the data is being placed and the value is the
* endpoint to which it should be forwarded.
*/
- public Map<InetAddress, InetAddress> getHintedEndpoints(Token token, Collection<InetAddress> naturalEndpoints)
+ public Map<InetAddress, InetAddress> getHintedEndpoints(Token token, String table, Collection<InetAddress> naturalEndpoints)
{
- return getHintedMapForEndpoints(getWriteEndpoints(token, naturalEndpoints));
+ return getHintedMapForEndpoints(table, getWriteEndpoints(token, table, naturalEndpoints));
}
/**
@@ -81,15 +82,16 @@
* Thus, this method may return more nodes than the Replication Factor.
*
* Only ReplicationStrategy should care about this method (higher level users should only ask for Hinted).
+ * todo: this method should be moved into TokenMetadata.
*/
- public Collection<InetAddress> getWriteEndpoints(Token token, Collection<InetAddress> naturalEndpoints)
+ public Collection<InetAddress> getWriteEndpoints(Token token, String table, Collection<InetAddress> naturalEndpoints)
{
- if (tokenMetadata_.getPendingRanges().isEmpty())
+ if (tokenMetadata_.getPendingRanges(table).isEmpty())
return naturalEndpoints;
List<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints);
- for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges().entrySet())
+ for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges(table).entrySet())
{
if (entry.getKey().contains(token))
{
@@ -107,12 +109,12 @@
*
* A destination node may be the destination for multiple targets.
*/
- private Map<InetAddress, InetAddress> getHintedMapForEndpoints(Collection<InetAddress> targets)
+ private Map<InetAddress, InetAddress> getHintedMapForEndpoints(String table, Collection<InetAddress> targets)
{
Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
Map<InetAddress, InetAddress> map = new HashMap<InetAddress, InetAddress>();
- IEndPointSnitch endPointSnitch = StorageService.instance.getEndPointSnitch();
+ IEndPointSnitch endPointSnitch = DatabaseDescriptor.getEndPointSnitch(table);
Set<InetAddress> liveNodes = Gossiper.instance.getLiveMembers();
for (InetAddress ep : targets)
@@ -160,14 +162,14 @@
this is fine as long as we don't use this on any critical path.
(fixing this would probably require merging tokenmetadata into replicationstrategy, so we could cache/invalidate cleanly.)
*/
- public Multimap<InetAddress, Range> getAddressRanges(TokenMetadata metadata)
+ public Multimap<InetAddress, Range> getAddressRanges(TokenMetadata metadata, String table)
{
Multimap<InetAddress, Range> map = HashMultimap.create();
for (Token token : metadata.sortedTokens())
{
Range range = metadata.getPrimaryRangeFor(token);
- for (InetAddress ep : getNaturalEndpoints(token, metadata))
+ for (InetAddress ep : getNaturalEndpoints(token, metadata, table))
{
map.put(ep, range);
}
@@ -176,14 +178,14 @@
return map;
}
- public Multimap<Range, InetAddress> getRangeAddresses(TokenMetadata metadata)
+ public Multimap<Range, InetAddress> getRangeAddresses(TokenMetadata metadata, String table)
{
Multimap<Range, InetAddress> map = HashMultimap.create();
for (Token token : metadata.sortedTokens())
{
Range range = metadata.getPrimaryRangeFor(token);
- for (InetAddress ep : getNaturalEndpoints(token, metadata))
+ for (InetAddress ep : getNaturalEndpoints(token, metadata, table))
{
map.put(range, ep);
}
@@ -192,16 +194,16 @@
return map;
}
- public Multimap<InetAddress, Range> getAddressRanges()
+ public Multimap<InetAddress, Range> getAddressRanges(String table)
{
- return getAddressRanges(tokenMetadata_);
+ return getAddressRanges(tokenMetadata_, table);
}
- public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress)
+ public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress, String table)
{
TokenMetadata temp = metadata.cloneOnlyTokenMap();
temp.updateNormalToken(pendingToken, pendingAddress);
- return getAddressRanges(temp).get(pendingAddress);
+ return getAddressRanges(temp, table).get(pendingAddress);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java Thu Feb 4 15:21:31 2010
@@ -48,7 +48,6 @@
private static Map<String, Integer> dcReplicationFactor = new HashMap<String, Integer>();
private static Map<String, Integer> quorumRepFactor = new HashMap<String, Integer>();
private static int locQFactor = 0;
- private static DatacenterEndPointSnitch endPointSnitch;
ArrayList<Token> tokens;
private List<InetAddress> localEndPoints = new ArrayList<InetAddress>();
@@ -69,14 +68,13 @@
*/
private synchronized void loadEndPoints(TokenMetadata metadata) throws IOException
{
- endPointSnitch = (DatacenterEndPointSnitch) StorageService.instance.getEndPointSnitch();
this.tokens = new ArrayList<Token>(tokens);
- String localDC = endPointSnitch.getLocation(InetAddress.getLocalHost());
+ String localDC = ((DatacenterEndPointSnitch)snitch_).getLocation(InetAddress.getLocalHost());
dcMap = new HashMap<String, List<Token>>();
for (Token token : this.tokens)
{
InetAddress endPoint = metadata.getEndPoint(token);
- String dataCenter = endPointSnitch.getLocation(endPoint);
+ String dataCenter = ((DatacenterEndPointSnitch)snitch_).getLocation(endPoint);
if (dataCenter.equals(localDC))
{
localEndPoints.add(endPoint);
@@ -95,7 +93,7 @@
Collections.sort(valueList);
dcMap.put(entry.getKey(), valueList);
}
- dcReplicationFactor = endPointSnitch.getMapReplicationFactor();
+ dcReplicationFactor = ((DatacenterEndPointSnitch)snitch_).getMapReplicationFactor();
for (Entry<String, Integer> entry : dcReplicationFactor.entrySet())
{
String datacenter = entry.getKey();
@@ -108,17 +106,17 @@
}
}
- public DatacenterShardStategy(TokenMetadata tokenMetadata, int replicas)
+ public DatacenterShardStategy(TokenMetadata tokenMetadata, IEndPointSnitch snitch)
throws UnknownHostException
{
- super(tokenMetadata, replicas);
- if ((!(DatabaseDescriptor.getEndPointSnitch() instanceof DatacenterEndPointSnitch)))
+ super(tokenMetadata, snitch);
+ if ((!(snitch instanceof DatacenterEndPointSnitch)))
{
throw new IllegalArgumentException("DatacenterShardStrategy requires DatacenterEndpointSnitch");
}
}
- public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
try
{
@@ -187,7 +185,7 @@
// Now try to find one on a different rack
if (!bOtherRack)
{
- if (!endPointSnitch.isOnSameRack(primaryHost, endPointOfIntrest))
+ if (!((DatacenterEndPointSnitch)snitch_).isOnSameRack(primaryHost, endPointOfIntrest))
{
forloopReturn.add(metadata.getEndPoint((Token) tokens.get(i)));
bOtherRack = true;
@@ -228,16 +226,16 @@
* return a DCQRH with a map of all the DC rep facor.
*/
@Override
- public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level)
+ public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
{
if (consistency_level == ConsistencyLevel.DCQUORUM)
{
- return new DatacenterWriteResponseHandler(locQFactor);
+ return new DatacenterWriteResponseHandler(locQFactor, table);
}
else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
{
- return new DatacenterSyncWriteResponseHandler(getQuorumRepFactor());
+ return new DatacenterSyncWriteResponseHandler(getQuorumRepFactor(), table);
}
- return super.getWriteResponseHandler(blockFor, consistency_level);
+ return super.getWriteResponseHandler(blockFor, consistency_level, table);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Thu Feb 4 15:21:31 2010
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
import java.net.InetAddress;
import org.apache.cassandra.service.StorageService;
@@ -36,12 +37,14 @@
*/
public class RackAwareStrategy extends AbstractReplicationStrategy
{
- public RackAwareStrategy(TokenMetadata tokenMetadata, int replicas)
+ public RackAwareStrategy(TokenMetadata tokenMetadata, IEndPointSnitch snitch)
{
- super(tokenMetadata, replicas);
+ super(tokenMetadata, snitch);
+ if (!(snitch instanceof EndPointSnitch))
+ throw new IllegalArgumentException(("RackAwareStrategy requires EndPointSnitch."));
}
- public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
int startIndex;
ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
@@ -65,20 +68,20 @@
Token primaryToken = (Token) tokens.get(index);
endpoints.add(metadata.getEndPoint(primaryToken));
foundCount++;
- if (replicas_ == 1)
+ final int replicas = DatabaseDescriptor.getReplicationFactor(table);
+ if (replicas == 1)
{
return endpoints;
}
startIndex = (index + 1)%totalNodes;
- EndPointSnitch endPointSnitch = (EndPointSnitch) StorageService.instance.getEndPointSnitch();
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas; ++count, i = (i + 1) % totalNodes)
{
try
{
// First try to find one in a different data center
Token t = (Token) tokens.get(i);
- if (!endPointSnitch.isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
+ if (!((EndPointSnitch)snitch_).isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
{
// If we have already found something in a diff datacenter no need to find another
if (!bDataCenter)
@@ -90,8 +93,8 @@
continue;
}
// Now try to find one on a different rack
- if (!endPointSnitch.isOnSameRack(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)) &&
- endPointSnitch.isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
+ if (!((EndPointSnitch)snitch_).isOnSameRack(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)) &&
+ ((EndPointSnitch)snitch_).isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
{
// If we have already found something in a diff rack no need to find another
if (!bOtherRack)
@@ -110,7 +113,7 @@
}
// If we found N number of nodes we are good. This loop wil just exit. Otherwise just
// loop through the list and add until we have N nodes.
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i+1)%totalNodes)
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas; ++count, i = (i+1)%totalNodes)
{
Token t = (Token) tokens.get(i);
if (!endpoints.contains(metadata.getEndPoint(t)))
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Thu Feb 4 15:21:31 2010
@@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
import java.net.InetAddress;
@@ -33,12 +34,12 @@
*/
public class RackUnawareStrategy extends AbstractReplicationStrategy
{
- public RackUnawareStrategy(TokenMetadata tokenMetadata, int replicas)
+ public RackUnawareStrategy(TokenMetadata tokenMetadata, IEndPointSnitch snitch)
{
- super(tokenMetadata, replicas);
+ super(tokenMetadata, snitch);
}
- public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata)
+ public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
{
int startIndex;
List<Token> tokenList = new ArrayList<Token>();
@@ -61,7 +62,8 @@
startIndex = (index + 1) % totalNodes;
// If we found N number of nodes we are good. This loop will just exit. Otherwise just
// loop through the list and add until we have N nodes.
- for (int i = startIndex, count = 1; count < totalNodes && tokenList.size() < replicas_; ++count, i = (i + 1) % totalNodes)
+ final int replicas = DatabaseDescriptor.getReplicationFactor(table);
+ for (int i = startIndex, count = 1; count < totalNodes && tokenList.size() < replicas; ++count, i = (i + 1) % totalNodes)
{
assert !tokenList.contains(tokens.get(i));
tokenList.add((Token) tokens.get(i));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Thu Feb 4 15:21:31 2010
@@ -19,6 +19,8 @@
package org.apache.cassandra.locator;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -55,7 +57,7 @@
// (See CASSANDRA-603 for more detail + examples).
private Set<InetAddress> leavingEndPoints;
- private Multimap<Range, InetAddress> pendingRanges;
+ private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges;
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -73,7 +75,7 @@
this.tokenToEndPointMap = tokenToEndPointMap;
bootstrapTokens = HashBiMap.create();
leavingEndPoints = new HashSet<InetAddress>();
- pendingRanges = HashMultimap.create();
+ pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
sortedTokens = sortTokens();
}
@@ -335,16 +337,27 @@
}
}
+ private synchronized Multimap<Range, InetAddress> getPendingRangesMM(String table)
+ {
+ Multimap<Range, InetAddress> map = pendingRanges.get(table);
+ if (map == null)
+ {
+ map = HashMultimap.create();
+ pendingRanges.put(table, map);
+ }
+ return map;
+ }
+
/** a mutable map may be returned but caller should not modify it */
- public Map<Range, Collection<InetAddress>> getPendingRanges()
+ public Map<Range, Collection<InetAddress>> getPendingRanges(String table)
{
- return pendingRanges.asMap();
+ return getPendingRangesMM(table).asMap();
}
- public List<Range> getPendingRanges(InetAddress endpoint)
+ public List<Range> getPendingRanges(String table, InetAddress endpoint)
{
List<Range> ranges = new ArrayList<Range>();
- for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
+ for (Map.Entry<Range, InetAddress> entry : getPendingRangesMM(table).entries())
{
if (entry.getValue().equals(endpoint))
{
@@ -354,9 +367,9 @@
return ranges;
}
- public void setPendingRanges(Multimap<Range, InetAddress> pendingRanges)
+ public void setPendingRanges(String table, Multimap<Range, InetAddress> rangeMap)
{
- this.pendingRanges = pendingRanges;
+ pendingRanges.put(table, rangeMap);
}
public Token getPredecessor(Token token)
@@ -463,10 +476,13 @@
{
StringBuilder sb = new StringBuilder();
- for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
+ for (Map.Entry<String, Multimap<Range, InetAddress>> entry : pendingRanges.entrySet())
{
- sb.append(entry.getValue() + ":" + entry.getKey());
- sb.append(System.getProperty("line.separator"));
+ for (Map.Entry<Range, InetAddress> rmap : entry.getValue().entries())
+ {
+ sb.append(rmap.getValue() + ":" + rmap.getKey());
+ sb.append(System.getProperty("line.separator"));
+ }
}
return sb.toString();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Feb 4 15:21:31 2010
@@ -142,11 +142,11 @@
/**
* Return all of the neighbors with whom we share data.
*/
- private static Collection<InetAddress> getNeighbors()
+ private static Collection<InetAddress> getNeighbors(String table)
{
InetAddress local = FBUtilities.getLocalAddress();
StorageService ss = StorageService.instance;
- return Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
+ return Collections2.filter(ss.getNaturalEndpoints(table, ss.getLocalToken()),
Predicates.not(Predicates.equalTo(local)));
}
@@ -158,7 +158,7 @@
* @param endpoint The endpoint which owns the given tree.
* @param tree The tree for the endpoint.
*/
- void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree)
+ private void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree)
{
InetAddress LOCAL = FBUtilities.getLocalAddress();
@@ -169,7 +169,7 @@
if (LOCAL.equals(endpoint))
{
// we're registering a local tree: rendezvous with all remote trees
- for (InetAddress neighbor : getNeighbors())
+ for (InetAddress neighbor : getNeighbors(cf.left))
{
TreePair waiting = ctrees.remove(neighbor);
if (waiting != null && waiting.right != null)
@@ -243,7 +243,7 @@
* @param remote The remote endpoint for the rendezvous.
* @return The tree pair for the given rendezvous if it exists, else null.
*/
- TreePair getRendezvousPair(String table, String cf, InetAddress remote)
+ TreePair getRendezvousPair_TestsOnly(String table, String cf, InetAddress remote)
{
return rendezvousPairs(new CFPair(table, cf)).get(remote);
}
@@ -251,7 +251,7 @@
/**
* Should only be used for testing.
*/
- void clearNaturalRepairs()
+ void clearNaturalRepairs_TestsOnly()
{
naturalRepairs.clear();
}
@@ -484,7 +484,7 @@
AntiEntropyService aes = AntiEntropyService.instance;
InetAddress local = FBUtilities.getLocalAddress();
- Collection<InetAddress> neighbors = getNeighbors();
+ Collection<InetAddress> neighbors = getNeighbors(cf.left);
// store the local tree and then broadcast it to our neighbors
aes.rendezvous(cf, local, tree);
@@ -562,8 +562,8 @@
rtree.partitioner(ss.getPartitioner());
// determine the ranges where responsibility overlaps
- Set<Range> interesting = new HashSet(ss.getRangesForEndPoint(local));
- interesting.retainAll(ss.getRangesForEndPoint(remote));
+ Set<Range> interesting = new HashSet(ss.getRangesForEndPoint(cf.left, local));
+ interesting.retainAll(ss.getRangesForEndPoint(cf.left, remote));
// compare trees, and filter out uninteresting differences
for (MerkleTree.TreeRange diff : MerkleTree.difference(ltree, rtree))
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Thu Feb 4 15:21:31 2010
@@ -44,12 +44,12 @@
private final Map<String, Integer> responseCounts;
private final DatacenterEndPointSnitch endPointSnitch;
- public DatacenterSyncWriteResponseHandler(Map<String, Integer> responseCounts)
+ public DatacenterSyncWriteResponseHandler(Map<String, Integer> responseCounts, String table)
{
// Response is been managed by the map so make it 1 for the superclass.
- super(1);
+ super(1, table);
this.responseCounts = responseCounts;
- endPointSnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch();
+ endPointSnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch(table);
}
@Override
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Thu Feb 4 15:21:31 2010
@@ -44,12 +44,12 @@
private DatacenterEndPointSnitch endpointsnitch;
private InetAddress localEndpoint;
- public DatacenterWriteResponseHandler(int blockFor)
+ public DatacenterWriteResponseHandler(int blockFor, String table)
{
// Response is been managed by the map so the waitlist size really doesnt matter.
- super(blockFor);
+ super(blockFor, table);
this.blockFor = blockFor;
- endpointsnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch();
+ endpointsnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch(table);
localEndpoint = FBUtilities.getLocalAddress();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Thu Feb 4 15:21:31 2010
@@ -49,7 +49,7 @@
public ReadResponseResolver(String table, int responseCount)
{
- assert 1 <= responseCount && responseCount <= DatabaseDescriptor.getReplicationFactor()
+ assert 1 <= responseCount && responseCount <= DatabaseDescriptor.getReplicationFactor(table)
: "invalid response count " + responseCount;
this.responseCount = responseCount;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Feb 4 15:21:31 2010
@@ -105,8 +105,8 @@
{
try
{
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.key());
- Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.key(), naturalEndpoints);
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.getTable(), rm.key());
+ Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.getTable(), rm.key(), naturalEndpoints);
Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes
// 3 cases:
@@ -174,15 +174,15 @@
for (RowMutation rm: mutations)
{
mostRecentRowMutation = rm;
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.key());
- Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.key(), naturalEndpoints);
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.getTable(), rm.key());
+ Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.getTable(), rm.key(), naturalEndpoints);
int blockFor = determineBlockFor(naturalEndpoints.size(), endpointMap.size(), consistency_level);
// avoid starting a write we know can't achieve the required consistency
assureSufficientLiveNodes(endpointMap, blockFor, consistency_level);
// send out the writes, as in insert() above, but this time with a callback that tracks responses
- final WriteResponseHandler responseHandler = StorageService.instance.getWriteResponseHandler(blockFor, consistency_level);
+ final WriteResponseHandler responseHandler = StorageService.instance.getWriteResponseHandler(blockFor, consistency_level, rm.getTable());
responseHandlers.add(responseHandler);
Message unhintedMessage = null;
for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
@@ -337,7 +337,7 @@
for (ReadCommand command: commands)
{
- InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.key);
+ InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key);
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
@@ -376,7 +376,7 @@
for (ReadCommand command: commands)
{
- List<InetAddress> endpoints = StorageService.instance.getNaturalEndpoints(command.key);
+ List<InetAddress> endpoints = StorageService.instance.getNaturalEndpoints(command.table, command.key);
boolean foundLocal = endpoints.contains(FBUtilities.getLocalAddress());
//TODO: Throw InvalidRequest if we're in bootstrap mode?
if (foundLocal && !StorageService.instance.isBootstrapMode())
@@ -423,7 +423,6 @@
List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
List<Row> rows = new ArrayList<Row>();
- int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
int commandIndex = 0;
for (ReadCommand command: commands)
@@ -434,8 +433,10 @@
Message message = command.makeReadMessage();
Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
- InetAddress dataPoint = StorageService.instance.findSuitableEndPoint(command.key);
- List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.key);
+ InetAddress dataPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key);
+ List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+ final String table = command.table;
+ int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), DatabaseDescriptor.getReplicationFactor(table), consistency_level);
if (endpointList.size() < responseCount)
throw new UnavailableException();
@@ -452,7 +453,7 @@
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
}
- QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table, responseCount));
+ QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(command.table), new ReadResponseResolver(command.table, responseCount));
MessagingService.instance.sendRR(messages, endPoints, quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndPoints.add(endPoints);
@@ -476,9 +477,9 @@
{
if (DatabaseDescriptor.getConsistencyCheck())
{
- IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum());
+ IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum(command.table));
QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
- DatabaseDescriptor.getQuorum(),
+ DatabaseDescriptor.getQuorum(command.table),
readResponseResolverRepair);
logger.info("DigestMismatchException: " + ex.getMessage());
Message messageRepair = command.makeReadMessage();
@@ -539,13 +540,14 @@
InetAddress endPoint = StorageService.instance.getPrimary(command.startKey.token);
InetAddress startEndpoint = endPoint;
- int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
+ final String table = command.keyspace;
+ int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), DatabaseDescriptor.getReplicationFactor(table), consistency_level);
Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(command.max_keys);
do
{
Range primaryRange = StorageService.instance.getPrimaryRangeForEndPoint(endPoint);
- List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(primaryRange.right);
+ List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, primaryRange.right);
if (endpoints.size() < responseCount)
throw new UnavailableException();
@@ -666,7 +668,7 @@
public Object call() throws IOException
{
- List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.key);
+ List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove(FBUtilities.getLocalAddress());