You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/02/04 16:21:33 UTC

svn commit: r906521 [1/3] - in /incubator/cassandra/trunk: ./ conf/ contrib/circuit/src/org/apache/cassandra/contrib/circuit/ src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apa...

Author: gdusbabek
Date: Thu Feb  4 15:21:31 2010
New Revision: 906521

URL: http://svn.apache.org/viewvc?rev=906521&view=rev
Log:
add per-keyspace replication factor and replication strategy. Patch by Gary Dusbabek, reviewed by Jaakko Laine.  CASSANDRA-620

Modified:
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/conf/storage-conf.xml
    incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    incubator/cassandra/trunk/test/conf/storage-conf.xml
    incubator/cassandra/trunk/test/system/test_server.py
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Thu Feb  4 15:21:31 2010
@@ -19,6 +19,7 @@
  * report latency and cache hit rate statistics with lifetime totals
    instead of average over the last minute (CASSANDRA-702)
  * support get_range_slice for RandomPartitioner (CASSANDRA-745)
+ * per-keyspace replication factory and replication strategy (CASSANDRA-620)
 
 
 0.5.1

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Thu Feb  4 15:21:31 2010
@@ -111,6 +111,30 @@
                     RowsCached="1000"
                     KeysCachedFraction="0"
                     Comment="A column family with supercolumns, whose column and subcolumn names are UTF8 strings"/>
+
+      <!--
+       ~ Strategy: Setting this to the class that implements
+       ~ IReplicaPlacementStrategy will change the way the node picker works.
+       ~ Out of the box, Cassandra provides
+       ~ org.apache.cassandra.locator.RackUnawareStrategy and
+       ~ org.apache.cassandra.locator.RackAwareStrategy (place one replica in
+       ~ a different datacenter, and the others on different racks in the same
+       ~ one.)
+      -->
+      <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+
+      <!-- Number of replicas of the data -->
+      <ReplicationFactor>1</ReplicationFactor>
+
+      <!--
+       ~ EndPointSnitch: Setting this to the class that implements
+       ~ AbstractEndpointSnitch, which lets Cassandra know enough
+       ~ about your network topology to route requests efficiently.
+       ~ Out of the box, Cassandra provides org.apache.cassandra.locator.EndPointSnitch,
+       ~ and PropertyFileEndPointSnitch is available in contrib/.
+      -->
+      <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+        
     </Keyspace>
   </Keyspaces>
 
@@ -157,29 +181,6 @@
   <InitialToken></InitialToken>
 
   <!--
-   ~ EndPointSnitch: Setting this to the class that implements
-   ~ AbstractEndpointSnitch, which lets Cassandra know enough
-   ~ about your network topology to route requests efficiently.
-   ~ Out of the box, Cassandra provides org.apache.cassandra.locator.EndPointSnitch,
-   ~ and PropertyFileEndPointSnitch is available in contrib/.
-  -->
-  <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
-
-  <!--
-   ~ Strategy: Setting this to the class that implements
-   ~ IReplicaPlacementStrategy will change the way the node picker works.
-   ~ Out of the box, Cassandra provides
-   ~ org.apache.cassandra.locator.RackUnawareStrategy and
-   ~ org.apache.cassandra.locator.RackAwareStrategy (place one replica in
-   ~ a different datacenter, and the others on different racks in the same
-   ~ one.)
-  -->
-  <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
-
-  <!-- Number of replicas of the data -->
-  <ReplicationFactor>1</ReplicationFactor>
-
-  <!--
    ~ Directories: Specify where Cassandra should store different data on
    ~ disk.  Keep the data disks and the CommitLog disks separate for best
    ~ performance

Modified: incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java (original)
+++ incubator/cassandra/trunk/contrib/circuit/src/org/apache/cassandra/contrib/circuit/RingModel.java Thu Feb  4 15:21:31 2010
@@ -101,7 +101,7 @@
                     "Invalid ObjectName? Please report this as a bug.", e);
         }
 
-        Map<Range, List<String>> rangeMap = ssProxy.getRangeToEndPointMap();
+        Map<Range, List<String>> rangeMap = ssProxy.getRangeToEndPointMap(null);
         List<Range> ranges = new ArrayList<Range>(rangeMap.keySet());
         Collections.sort(ranges);
         

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Thu Feb  4 15:21:31 2010
@@ -48,9 +48,9 @@
     final private static Logger logger_ = Logger.getLogger(RingCache.class);
 
     private Set<String> seeds_ = new HashSet<String>();
-    final private int port_=DatabaseDescriptor.getThriftPort();
-    private volatile AbstractReplicationStrategy nodePicker_;
+    final private int port_= DatabaseDescriptor.getThriftPort();
     final private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
+    private TokenMetadata tokenMetadata;
 
     public RingCache()
     {
@@ -89,8 +89,7 @@
                     }
                 }
 
-                TokenMetadata tokenMetadata = new TokenMetadata(tokenEndpointMap);
-                nodePicker_ = StorageService.getReplicationStrategy(tokenMetadata);
+                tokenMetadata = new TokenMetadata(tokenEndpointMap);
 
                 break;
             }
@@ -102,8 +101,11 @@
         }
     }
 
-    public List<InetAddress> getEndPoint(String key)
+    public List<InetAddress> getEndPoint(String table, String key)
     {
-        return nodePicker_.getNaturalEndpoints(partitioner_.getToken(key));
+        if (tokenMetadata == null)
+            throw new RuntimeException("Must refresh endpoints before looking up a key.");
+        AbstractReplicationStrategy strat = StorageService.getReplicationStrategy(tokenMetadata, table);
+        return strat.getNaturalEndpoints(partitioner_.getToken(key), table);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Feb  4 15:21:31 2010
@@ -69,7 +69,6 @@
     private static InetAddress listenAddress_; // leave null so we can fall through to getLocalHost
     private static InetAddress thriftAddress_;
     private static String clusterName_ = "Test";
-    private static int replicationFactor_ = 3;
     private static long rpcTimeoutInMillis_ = 2000;
     private static Set<InetAddress> seeds_ = new HashSet<InetAddress>();
     /* Keeps the list of data file directories */
@@ -96,12 +95,17 @@
      * corresponding meta data for that column family.
     */
     private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
+
+    // map tables to replication strategies.
+    private static Map<String, Class<AbstractReplicationStrategy>> replicationStrategyClasses_;
+
+    // map tables to replication factors.
+    private static Map<String, Integer> replicationFactors_;
+
     /* Hashing strategy Random or OPHF */
     private static IPartitioner partitioner_;
 
-    private static IEndPointSnitch endPointSnitch_;
-
-    private static Class<AbstractReplicationStrategy> replicaPlacementStrategyClass_;
+    private static Map<String, IEndPointSnitch> endPointSnitches_;
 
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     private static int columnIndexSizeInKB_;
@@ -256,22 +260,6 @@
                 throw new ConfigurationException("Invalid partitioner class " + partitionerClassName);
             }
 
-            /* end point snitch */
-            String endPointSnitchClassName = xmlUtils.getNodeValue("/Storage/EndPointSnitch");
-            if (endPointSnitchClassName == null)
-            {
-                throw new ConfigurationException("Missing endpointsnitch directive /Storage/EndPointSnitch");
-            }
-            try
-            {
-                Class cls = Class.forName(endPointSnitchClassName);
-                endPointSnitch_ = (IEndPointSnitch) cls.getConstructor().newInstance();
-            }
-            catch (ClassNotFoundException e)
-            {
-                throw new ConfigurationException("Invalid endpointsnitch class " + endPointSnitchClassName);
-            }
-            
             /* JobTracker address */
             jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
 
@@ -284,11 +272,6 @@
 
             initialToken_ = xmlUtils.getNodeValue("/Storage/InitialToken");
 
-            /* Data replication factor */
-            String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
-            if ( replicationFactor != null )
-                replicationFactor_ = Integer.parseInt(replicationFactor);
-
             /* RPC Timeout */
             String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
             if ( rpcTimeoutInMillis != null )
@@ -457,21 +440,9 @@
                 CommitLog.setSegmentSize(Integer.parseInt(value) * 1024 * 1024);
 
             tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
-
-            /* See which replica placement strategy to use */
-            String replicaPlacementStrategyClassName = xmlUtils.getNodeValue("/Storage/ReplicaPlacementStrategy");
-            if (replicaPlacementStrategyClassName == null)
-            {
-                throw new ConfigurationException("Missing replicaplacementstrategy directive /Storage/ReplicaPlacementStrategy");
-            }
-            try
-            {
-                replicaPlacementStrategyClass_ = (Class<AbstractReplicationStrategy>) Class.forName(replicaPlacementStrategyClassName);
-            }
-            catch (ClassNotFoundException e)
-            {
-                throw new ConfigurationException("Invalid replicaplacementstrategy class " + replicaPlacementStrategyClassName);
-            }
+            replicationFactors_ = new HashMap<String, Integer>();
+            replicationStrategyClasses_ = new HashMap<String, Class<AbstractReplicationStrategy>>();
+            endPointSnitches_ = new HashMap<String, IEndPointSnitch>();
 
             /* Read the table related stuff from config */
             NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Keyspaces/Keyspace");
@@ -493,6 +464,45 @@
                 tables_.add(tName);
                 tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
 
+                /* See which replica placement strategy to use */
+                String replicaPlacementStrategyClassName = xmlUtils.getNodeValue("/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/ReplicaPlacementStrategy");
+                if (replicaPlacementStrategyClassName == null)
+                {
+                    throw new ConfigurationException("Missing replicaplacementstrategy directive for " + tName);
+                }
+                try
+                {
+                    Class cls = (Class<AbstractReplicationStrategy>) Class.forName(replicaPlacementStrategyClassName);
+                    replicationStrategyClasses_.put(tName, cls);
+                }
+                catch (ClassNotFoundException e)
+                {
+                    throw new ConfigurationException("Invalid replicaplacementstrategy class " + replicaPlacementStrategyClassName);
+                }
+
+                /* Data replication factor */
+                String replicationFactor = xmlUtils.getNodeValue("/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/ReplicationFactor");
+                if (replicationFactor == null)
+                    throw new ConfigurationException("Missing replicationfactor directory for keyspace " + tName);
+                else
+                    replicationFactors_.put(tName, Integer.parseInt(replicationFactor));
+
+                /* end point snitch */
+                String endPointSnitchClassName = xmlUtils.getNodeValue("/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/EndPointSnitch");
+                if (endPointSnitchClassName == null)
+                {
+                    throw new ConfigurationException("Missing endpointsnitch directive for keyspace " + tName);
+                }
+                try
+                {
+                    Class cls = Class.forName(endPointSnitchClassName);
+                    endPointSnitches_.put(tName, (IEndPointSnitch)cls.getConstructor().newInstance());
+                }
+                catch (ClassNotFoundException e)
+                {
+                    throw new ConfigurationException("Invalid endpointsnitch class " + endPointSnitchClassName);
+                }
+
                 String xqlTable = "/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/";
                 NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
 
@@ -729,14 +739,14 @@
         return partitioner_;
     }
     
-    public static IEndPointSnitch getEndPointSnitch()
+    public static IEndPointSnitch getEndPointSnitch(String table)
     {
-        return endPointSnitch_;
+        return endPointSnitches_.get(table);
     }
 
-    public static Class<AbstractReplicationStrategy> getReplicaPlacementStrategyClass()
+    public static Class<AbstractReplicationStrategy> getReplicaPlacementStrategyClass(String table)
     {
-        return replicaPlacementStrategyClass_;
+        return replicationStrategyClasses_.get(table);
     }
     
     public static String getJobTrackerAddress()
@@ -851,14 +861,14 @@
         return thriftPort_;
     }
 
-    public static int getReplicationFactor()
+    public static int getReplicationFactor(String table)
     {
-        return replicationFactor_;
+        return replicationFactors_.get(table);
     }
 
-    public static int getQuorum()
+    public static int getQuorum(String table)
     {
-        return (replicationFactor_ / 2) + 1;
+        return (replicationFactors_.get(table) / 2) + 1;
     }
 
     public static long getRpcTimeout()
@@ -1079,8 +1089,9 @@
     /**
      * For testing purposes.
      */
-    static void setReplicationFactorUnsafe(int factor)
+    static void setReplicationFactorUnsafe(String table, int factor)
     {
-        replicationFactor_ = factor;
+        replicationFactors_.remove(table);
+        replicationFactors_.put(table, factor);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Thu Feb  4 15:21:31 2010
@@ -433,7 +433,7 @@
     private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
     {
         Collection<SSTableReader> originalSSTables = cfs.getSSTables();
-        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, StorageService.instance.getLocalRanges(), null);
+        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name), null);
         if (!sstables.isEmpty())
         {
             cfs.replaceCompactedSSTables(originalSSTables, sstables);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Feb  4 15:21:31 2010
@@ -124,7 +124,7 @@
                 rm.add(cf);
         }
         Message message = rm.makeRowMutationMessage();
-        WriteResponseHandler responseHandler = new WriteResponseHandler(1);
+        WriteResponseHandler responseHandler = new WriteResponseHandler(1, tableName);
         MessagingService.instance.sendRR(message, new InetAddress[] { endPoint }, responseHandler);
 
         try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu Feb  4 15:21:31 2010
@@ -106,7 +106,7 @@
             /* Do read repair if header of the message says so */
             if (message.getHeader(ReadCommand.DO_REPAIR) != null)
             {
-                List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.key);
+                List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
                 /* Remove the local storage endpoint from the list. */
                 endpoints.remove(FBUtilities.getLocalAddress());
                 if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Thu Feb  4 15:21:31 2010
@@ -24,6 +24,7 @@
  import java.io.UnsupportedEncodingException;
  import java.net.InetAddress;
 
+ import org.apache.commons.lang.StringUtils;
  import org.apache.log4j.Logger;
 
  import org.apache.commons.lang.ArrayUtils;
@@ -51,14 +52,12 @@
     /* tokens of the nodes being bootstrapped. */
     protected final Token token;
     protected final TokenMetadata tokenMetadata;
-    private final AbstractReplicationStrategy replicationStrategy;
 
-    public BootStrapper(AbstractReplicationStrategy rs, InetAddress address, Token token, TokenMetadata tmd)
+    public BootStrapper(InetAddress address, Token token, TokenMetadata tmd)
     {
         assert address != null;
         assert token != null;
 
-        replicationStrategy = rs;
         this.address = address;
         this.token = token;
         tokenMetadata = tmd;
@@ -70,16 +69,20 @@
         {
             public void run()
             {
-                Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources();
                 if (logger.isDebugEnabled())
                     logger.debug("Beginning bootstrap process");
-                /* Send messages to respective folks to stream data over to me */
-                for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
+                for (String table : DatabaseDescriptor.getNonSystemTables())
                 {
-                    InetAddress source = entry.getKey();
-                    for (String table : DatabaseDescriptor.getNonSystemTables())
+                    Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources(table);
+                    /* Send messages to respective folks to stream data over to me */
+                    for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
+                    {   
+                        InetAddress source = entry.getKey();
                         StorageService.instance.addBootstrapSource(source, table);
-                    StreamIn.requestRanges(source, entry.getValue());
+                        if (logger.isDebugEnabled())
+                            logger.debug("Requesting from " + source + " ranges " + StringUtils.join(entry.getValue(), ", "));
+                        StreamIn.requestRanges(source, table, entry.getValue());
+                    }
                 }
             }
         }, "Boostrap requester").start();
@@ -144,20 +147,21 @@
     }
 
     /** get potential sources for each range, ordered by proximity (as determined by EndPointSnitch) */
-    Multimap<Range, InetAddress> getRangesWithSources()
+    Multimap<Range, InetAddress> getRangesWithSources(String table)
     {
         assert tokenMetadata.sortedTokens().size() > 0;
-        Collection<Range> myRanges = replicationStrategy.getPendingAddressRanges(tokenMetadata, token, address);
+        final AbstractReplicationStrategy strat = StorageService.instance.getReplicationStrategy(table);
+        Collection<Range> myRanges = strat.getPendingAddressRanges(tokenMetadata, token, address, table);
 
         Multimap<Range, InetAddress> myRangeAddresses = ArrayListMultimap.create();
-        Multimap<Range, InetAddress> rangeAddresses = replicationStrategy.getRangeAddresses(tokenMetadata);
+        Multimap<Range, InetAddress> rangeAddresses = strat.getRangeAddresses(tokenMetadata, table);
         for (Range range : rangeAddresses.keySet())
         {
             for (Range myRange : myRanges)
             {
                 if (range.contains(myRange))
                 {
-                    List<InetAddress> preferred = DatabaseDescriptor.getEndPointSnitch().getSortedListByProximity(address, rangeAddresses.get(range));
+                    List<InetAddress> preferred = DatabaseDescriptor.getEndPointSnitch(table).getSortedListByProximity(address, rangeAddresses.get(range));
                     myRangeAddresses.putAll(myRange, preferred);
                     break;
                 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Feb  4 15:21:31 2010
@@ -21,6 +21,7 @@
 import java.net.InetAddress;
 import java.util.*;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.HashMultimap;
@@ -43,25 +44,25 @@
 {
     protected static final Logger logger_ = Logger.getLogger(AbstractReplicationStrategy.class);
 
-    protected TokenMetadata tokenMetadata_;
-    protected int replicas_;
+    private TokenMetadata tokenMetadata_;
+    protected final IEndPointSnitch snitch_;
 
-    AbstractReplicationStrategy(TokenMetadata tokenMetadata, int replicas)
+    AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndPointSnitch snitch)
     {
         tokenMetadata_ = tokenMetadata;
-        replicas_ = replicas;
+        snitch_ = snitch;
     }
 
-    public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata);
+    public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table);
     
-    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level)
+    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
     {
-        return new WriteResponseHandler(blockFor);
+        return new WriteResponseHandler(blockFor, table);
     }
 
-    public ArrayList<InetAddress> getNaturalEndpoints(Token token)
+    public ArrayList<InetAddress> getNaturalEndpoints(Token token, String table)
     {
-        return getNaturalEndpoints(token, tokenMetadata_);
+        return getNaturalEndpoints(token, tokenMetadata_, table);
     }
     
     /*
@@ -69,9 +70,9 @@
      * on which the data is being placed and the value is the
      * endpoint to which it should be forwarded.
      */
-    public Map<InetAddress, InetAddress> getHintedEndpoints(Token token, Collection<InetAddress> naturalEndpoints)
+    public Map<InetAddress, InetAddress> getHintedEndpoints(Token token, String table, Collection<InetAddress> naturalEndpoints)
     {
-        return getHintedMapForEndpoints(getWriteEndpoints(token, naturalEndpoints));
+        return getHintedMapForEndpoints(table, getWriteEndpoints(token, table, naturalEndpoints));
     }
 
     /**
@@ -81,15 +82,16 @@
      * Thus, this method may return more nodes than the Replication Factor.
      *
      * Only ReplicationStrategy should care about this method (higher level users should only ask for Hinted).
+     * todo: this method should be moved into TokenMetadata.
      */
-    public Collection<InetAddress> getWriteEndpoints(Token token, Collection<InetAddress> naturalEndpoints)
+    public Collection<InetAddress> getWriteEndpoints(Token token, String table, Collection<InetAddress> naturalEndpoints)
     {
-        if (tokenMetadata_.getPendingRanges().isEmpty())
+        if (tokenMetadata_.getPendingRanges(table).isEmpty())
             return naturalEndpoints;
 
         List<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints);
 
-        for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges().entrySet())
+        for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges(table).entrySet())
         {
             if (entry.getKey().contains(token))
             {
@@ -107,12 +109,12 @@
      *
      * A destination node may be the destination for multiple targets.
      */
-    private Map<InetAddress, InetAddress> getHintedMapForEndpoints(Collection<InetAddress> targets)
+    private Map<InetAddress, InetAddress> getHintedMapForEndpoints(String table, Collection<InetAddress> targets)
     {
         Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
         Map<InetAddress, InetAddress> map = new HashMap<InetAddress, InetAddress>();
 
-        IEndPointSnitch endPointSnitch = StorageService.instance.getEndPointSnitch();
+        IEndPointSnitch endPointSnitch = DatabaseDescriptor.getEndPointSnitch(table);
         Set<InetAddress> liveNodes = Gossiper.instance.getLiveMembers();
 
         for (InetAddress ep : targets)
@@ -160,14 +162,14 @@
      this is fine as long as we don't use this on any critical path.
      (fixing this would probably require merging tokenmetadata into replicationstrategy, so we could cache/invalidate cleanly.)
      */
-    public Multimap<InetAddress, Range> getAddressRanges(TokenMetadata metadata)
+    public Multimap<InetAddress, Range> getAddressRanges(TokenMetadata metadata, String table)
     {
         Multimap<InetAddress, Range> map = HashMultimap.create();
 
         for (Token token : metadata.sortedTokens())
         {
             Range range = metadata.getPrimaryRangeFor(token);
-            for (InetAddress ep : getNaturalEndpoints(token, metadata))
+            for (InetAddress ep : getNaturalEndpoints(token, metadata, table))
             {
                 map.put(ep, range);
             }
@@ -176,14 +178,14 @@
         return map;
     }
 
-    public Multimap<Range, InetAddress> getRangeAddresses(TokenMetadata metadata)
+    public Multimap<Range, InetAddress> getRangeAddresses(TokenMetadata metadata, String table)
     {
         Multimap<Range, InetAddress> map = HashMultimap.create();
 
         for (Token token : metadata.sortedTokens())
         {
             Range range = metadata.getPrimaryRangeFor(token);
-            for (InetAddress ep : getNaturalEndpoints(token, metadata))
+            for (InetAddress ep : getNaturalEndpoints(token, metadata, table))
             {
                 map.put(range, ep);
             }
@@ -192,16 +194,16 @@
         return map;
     }
 
-    public Multimap<InetAddress, Range> getAddressRanges()
+    public Multimap<InetAddress, Range> getAddressRanges(String table)
     {
-        return getAddressRanges(tokenMetadata_);
+        return getAddressRanges(tokenMetadata_, table);
     }
 
-    public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress)
+    public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress, String table)
     {
         TokenMetadata temp = metadata.cloneOnlyTokenMap();
         temp.updateNormalToken(pendingToken, pendingAddress);
-        return getAddressRanges(temp).get(pendingAddress);
+        return getAddressRanges(temp, table).get(pendingAddress);
     }
 
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java Thu Feb  4 15:21:31 2010
@@ -48,7 +48,6 @@
     private static Map<String, Integer> dcReplicationFactor = new HashMap<String, Integer>();
     private static Map<String, Integer> quorumRepFactor = new HashMap<String, Integer>();
     private static int locQFactor = 0;
-    private static DatacenterEndPointSnitch endPointSnitch;
     ArrayList<Token> tokens;
 
     private List<InetAddress> localEndPoints = new ArrayList<InetAddress>();
@@ -69,14 +68,13 @@
      */
     private synchronized void loadEndPoints(TokenMetadata metadata) throws IOException
     {
-        endPointSnitch = (DatacenterEndPointSnitch) StorageService.instance.getEndPointSnitch();
         this.tokens = new ArrayList<Token>(tokens);
-        String localDC = endPointSnitch.getLocation(InetAddress.getLocalHost());
+        String localDC = ((DatacenterEndPointSnitch)snitch_).getLocation(InetAddress.getLocalHost());
         dcMap = new HashMap<String, List<Token>>();
         for (Token token : this.tokens)
         {
             InetAddress endPoint = metadata.getEndPoint(token);
-            String dataCenter = endPointSnitch.getLocation(endPoint);
+            String dataCenter = ((DatacenterEndPointSnitch)snitch_).getLocation(endPoint);
             if (dataCenter.equals(localDC))
             {
                 localEndPoints.add(endPoint);
@@ -95,7 +93,7 @@
             Collections.sort(valueList);
             dcMap.put(entry.getKey(), valueList);
         }
-        dcReplicationFactor = endPointSnitch.getMapReplicationFactor();
+        dcReplicationFactor = ((DatacenterEndPointSnitch)snitch_).getMapReplicationFactor();
         for (Entry<String, Integer> entry : dcReplicationFactor.entrySet())
         {
             String datacenter = entry.getKey();
@@ -108,17 +106,17 @@
         }
     }
 
-    public DatacenterShardStategy(TokenMetadata tokenMetadata, int replicas)
+    public DatacenterShardStategy(TokenMetadata tokenMetadata, IEndPointSnitch snitch)
     throws UnknownHostException
     {
-        super(tokenMetadata, replicas);
-        if ((!(DatabaseDescriptor.getEndPointSnitch() instanceof DatacenterEndPointSnitch)))
+        super(tokenMetadata, snitch);
+        if ((!(snitch instanceof DatacenterEndPointSnitch)))
         {
             throw new IllegalArgumentException("DatacenterShardStrategy requires DatacenterEndpointSnitch");
         }
     }
 
-    public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata)
+    public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
         try
         {
@@ -187,7 +185,7 @@
                 // Now try to find one on a different rack
                 if (!bOtherRack)
                 {
-                    if (!endPointSnitch.isOnSameRack(primaryHost, endPointOfIntrest))
+                    if (!((DatacenterEndPointSnitch)snitch_).isOnSameRack(primaryHost, endPointOfIntrest))
                     {
                         forloopReturn.add(metadata.getEndPoint((Token) tokens.get(i)));
                         bOtherRack = true;
@@ -228,16 +226,16 @@
      * return a DCQRH with a map of all the DC rep facor.
      */
     @Override
-    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level)
+    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
     {
         if (consistency_level == ConsistencyLevel.DCQUORUM)
         {
-            return new DatacenterWriteResponseHandler(locQFactor);
+            return new DatacenterWriteResponseHandler(locQFactor, table);
         }
         else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
         {
-            return new DatacenterSyncWriteResponseHandler(getQuorumRepFactor());
+            return new DatacenterSyncWriteResponseHandler(getQuorumRepFactor(), table);
         }
-        return super.getWriteResponseHandler(blockFor, consistency_level);
+        return super.getWriteResponseHandler(blockFor, consistency_level, table);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Thu Feb  4 15:21:31 2010
@@ -23,6 +23,7 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
 import java.net.InetAddress;
 import org.apache.cassandra.service.StorageService;
@@ -36,12 +37,14 @@
  */
 public class RackAwareStrategy extends AbstractReplicationStrategy
 {
-    public RackAwareStrategy(TokenMetadata tokenMetadata, int replicas)
+    public RackAwareStrategy(TokenMetadata tokenMetadata, IEndPointSnitch snitch)
     {
-        super(tokenMetadata, replicas);
+        super(tokenMetadata, snitch);
+        if (!(snitch instanceof EndPointSnitch))
+            throw new IllegalArgumentException(("RackAwareStrategy requires EndPointSnitch."));
     }
 
-    public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata)
+    public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
         int startIndex;
         ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
@@ -65,20 +68,20 @@
         Token primaryToken = (Token) tokens.get(index);
         endpoints.add(metadata.getEndPoint(primaryToken));
         foundCount++;
-        if (replicas_ == 1)
+        final int replicas = DatabaseDescriptor.getReplicationFactor(table);
+        if (replicas == 1)
         {
             return endpoints;
         }
         startIndex = (index + 1)%totalNodes;
-        EndPointSnitch endPointSnitch = (EndPointSnitch) StorageService.instance.getEndPointSnitch();
 
-        for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+        for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas; ++count, i = (i + 1) % totalNodes)
         {
             try
             {
                 // First try to find one in a different data center
                 Token t = (Token) tokens.get(i);
-                if (!endPointSnitch.isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
+                if (!((EndPointSnitch)snitch_).isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
                 {
                     // If we have already found something in a diff datacenter no need to find another
                     if (!bDataCenter)
@@ -90,8 +93,8 @@
                     continue;
                 }
                 // Now  try to find one on a different rack
-                if (!endPointSnitch.isOnSameRack(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)) &&
-                    endPointSnitch.isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
+                if (!((EndPointSnitch)snitch_).isOnSameRack(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)) &&
+                    ((EndPointSnitch)snitch_).isInSameDataCenter(metadata.getEndPoint(primaryToken), metadata.getEndPoint(t)))
                 {
                     // If we have already found something in a diff rack no need to find another
                     if (!bOtherRack)
@@ -110,7 +113,7 @@
         }
         // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
         // loop through the list and add until we have N nodes.
-        for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i+1)%totalNodes)
+        for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas; ++count, i = (i+1)%totalNodes)
         {
             Token t = (Token) tokens.get(i);
             if (!endpoints.contains(metadata.getEndPoint(t)))

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Thu Feb  4 15:21:31 2010
@@ -22,6 +22,7 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
 import java.net.InetAddress;
 
@@ -33,12 +34,12 @@
  */
 public class RackUnawareStrategy extends AbstractReplicationStrategy
 {
-    public RackUnawareStrategy(TokenMetadata tokenMetadata, int replicas)
+    public RackUnawareStrategy(TokenMetadata tokenMetadata, IEndPointSnitch snitch)
     {
-        super(tokenMetadata, replicas);
+        super(tokenMetadata, snitch);
     }
 
-    public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata)
+    public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table)
     {
         int startIndex;
         List<Token> tokenList = new ArrayList<Token>();
@@ -61,7 +62,8 @@
         startIndex = (index + 1) % totalNodes;
         // If we found N number of nodes we are good. This loop will just exit. Otherwise just
         // loop through the list and add until we have N nodes.
-        for (int i = startIndex, count = 1; count < totalNodes && tokenList.size() < replicas_; ++count, i = (i + 1) % totalNodes)
+        final int replicas = DatabaseDescriptor.getReplicationFactor(table);
+        for (int i = startIndex, count = 1; count < totalNodes && tokenList.size() < replicas; ++count, i = (i + 1) % totalNodes)
         {
             assert !tokenList.contains(tokens.get(i));
             tokenList.add((Token) tokens.get(i));

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Thu Feb  4 15:21:31 2010
@@ -19,6 +19,8 @@
 package org.apache.cassandra.locator;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -55,7 +57,7 @@
     // (See CASSANDRA-603 for more detail + examples).
     private Set<InetAddress> leavingEndPoints;
 
-    private Multimap<Range, InetAddress> pendingRanges;
+    private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges;
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -73,7 +75,7 @@
         this.tokenToEndPointMap = tokenToEndPointMap;
         bootstrapTokens = HashBiMap.create();
         leavingEndPoints = new HashSet<InetAddress>();
-        pendingRanges = HashMultimap.create();
+        pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
         sortedTokens = sortTokens();
     }
 
@@ -335,16 +337,27 @@
         }
     }
 
+    private synchronized Multimap<Range, InetAddress> getPendingRangesMM(String table)
+    {
+        Multimap<Range, InetAddress> map = pendingRanges.get(table);
+        if (map == null)
+        {
+             map = HashMultimap.create();
+            pendingRanges.put(table, map);
+        }
+        return map;
+    }
+
     /** a mutable map may be returned but caller should not modify it */
-    public Map<Range, Collection<InetAddress>> getPendingRanges()
+    public Map<Range, Collection<InetAddress>> getPendingRanges(String table)
     {
-        return pendingRanges.asMap();
+        return getPendingRangesMM(table).asMap();
     }
 
-    public List<Range> getPendingRanges(InetAddress endpoint)
+    public List<Range> getPendingRanges(String table, InetAddress endpoint)
     {
         List<Range> ranges = new ArrayList<Range>();
-        for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
+        for (Map.Entry<Range, InetAddress> entry : getPendingRangesMM(table).entries())
         {
             if (entry.getValue().equals(endpoint))
             {
@@ -354,9 +367,9 @@
         return ranges;
     }
 
-    public void setPendingRanges(Multimap<Range, InetAddress> pendingRanges)
+    public void setPendingRanges(String table, Multimap<Range, InetAddress> rangeMap)
     {
-        this.pendingRanges = pendingRanges;
+        pendingRanges.put(table, rangeMap);
     }
 
     public Token getPredecessor(Token token)
@@ -463,10 +476,13 @@
     {
         StringBuilder sb = new StringBuilder();
 
-        for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
+        for (Map.Entry<String, Multimap<Range, InetAddress>> entry : pendingRanges.entrySet())
         {
-            sb.append(entry.getValue() + ":" + entry.getKey());
-            sb.append(System.getProperty("line.separator"));
+            for (Map.Entry<Range, InetAddress> rmap : entry.getValue().entries())
+            {
+                sb.append(rmap.getValue() + ":" + rmap.getKey());
+                sb.append(System.getProperty("line.separator"));
+            }
         }
 
         return sb.toString();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Feb  4 15:21:31 2010
@@ -142,11 +142,11 @@
     /**
      * Return all of the neighbors with whom we share data.
      */
-    private static Collection<InetAddress> getNeighbors()
+    private static Collection<InetAddress> getNeighbors(String table)
     {
         InetAddress local = FBUtilities.getLocalAddress();
         StorageService ss = StorageService.instance;
-        return Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
+        return Collections2.filter(ss.getNaturalEndpoints(table, ss.getLocalToken()),
                                    Predicates.not(Predicates.equalTo(local)));
     }
 
@@ -158,7 +158,7 @@
      * @param endpoint The endpoint which owns the given tree.
      * @param tree The tree for the endpoint.
      */
-    void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree)
+    private void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree)
     {
         InetAddress LOCAL = FBUtilities.getLocalAddress();
 
@@ -169,7 +169,7 @@
         if (LOCAL.equals(endpoint))
         {
             // we're registering a local tree: rendezvous with all remote trees
-            for (InetAddress neighbor : getNeighbors())
+            for (InetAddress neighbor : getNeighbors(cf.left))
             {
                 TreePair waiting = ctrees.remove(neighbor);
                 if (waiting != null && waiting.right != null)
@@ -243,7 +243,7 @@
      * @param remote The remote endpoint for the rendezvous.
      * @return The tree pair for the given rendezvous if it exists, else  null.
      */
-    TreePair getRendezvousPair(String table, String cf, InetAddress remote)
+    TreePair getRendezvousPair_TestsOnly(String table, String cf, InetAddress remote)
     {
         return rendezvousPairs(new CFPair(table, cf)).get(remote);
     }
@@ -251,7 +251,7 @@
     /**
      * Should only be used for testing.
      */
-    void clearNaturalRepairs()
+    void clearNaturalRepairs_TestsOnly()
     {
         naturalRepairs.clear();
     }
@@ -484,7 +484,7 @@
             AntiEntropyService aes = AntiEntropyService.instance;
             InetAddress local = FBUtilities.getLocalAddress();
 
-            Collection<InetAddress> neighbors = getNeighbors();
+            Collection<InetAddress> neighbors = getNeighbors(cf.left);
 
             // store the local tree and then broadcast it to our neighbors
             aes.rendezvous(cf, local, tree);
@@ -562,8 +562,8 @@
                 rtree.partitioner(ss.getPartitioner());
 
             // determine the ranges where responsibility overlaps
-            Set<Range> interesting = new HashSet(ss.getRangesForEndPoint(local));
-            interesting.retainAll(ss.getRangesForEndPoint(remote));
+            Set<Range> interesting = new HashSet(ss.getRangesForEndPoint(cf.left, local));
+            interesting.retainAll(ss.getRangesForEndPoint(cf.left, remote));
 
             // compare trees, and filter out uninteresting differences
             for (MerkleTree.TreeRange diff : MerkleTree.difference(ltree, rtree))

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Thu Feb  4 15:21:31 2010
@@ -44,12 +44,12 @@
     private final Map<String, Integer> responseCounts;
     private final DatacenterEndPointSnitch endPointSnitch;
 
-    public DatacenterSyncWriteResponseHandler(Map<String, Integer> responseCounts)
+    public DatacenterSyncWriteResponseHandler(Map<String, Integer> responseCounts, String table)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(1);
+        super(1, table);
         this.responseCounts = responseCounts;
-        endPointSnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch();
+        endPointSnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch(table);
     }
 
     @Override

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Thu Feb  4 15:21:31 2010
@@ -44,12 +44,12 @@
     private DatacenterEndPointSnitch endpointsnitch;
     private InetAddress localEndpoint;
 
-    public DatacenterWriteResponseHandler(int blockFor)
+    public DatacenterWriteResponseHandler(int blockFor, String table)
     {
         // Response is been managed by the map so the waitlist size really doesnt matter.
-        super(blockFor);
+        super(blockFor, table);
         this.blockFor = blockFor;
-        endpointsnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch();
+        endpointsnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch(table);
         localEndpoint = FBUtilities.getLocalAddress();
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Thu Feb  4 15:21:31 2010
@@ -49,7 +49,7 @@
 
     public ReadResponseResolver(String table, int responseCount)
     {
-        assert 1 <= responseCount && responseCount <= DatabaseDescriptor.getReplicationFactor()
+        assert 1 <= responseCount && responseCount <= DatabaseDescriptor.getReplicationFactor(table)
             : "invalid response count " + responseCount;
 
         this.responseCount = responseCount;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=906521&r1=906520&r2=906521&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Feb  4 15:21:31 2010
@@ -105,8 +105,8 @@
             {
                 try
                 {
-                    List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.key());
-                    Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.key(), naturalEndpoints);
+                    List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.getTable(), rm.key());
+                    Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.getTable(), rm.key(), naturalEndpoints);
                     Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes
 
                     // 3 cases:
@@ -174,15 +174,15 @@
             for (RowMutation rm: mutations)
             {
                 mostRecentRowMutation = rm;
-                List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.key());
-                Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.key(), naturalEndpoints);
+                List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(rm.getTable(), rm.key());
+                Map<InetAddress, InetAddress> endpointMap = StorageService.instance.getHintedEndpointMap(rm.getTable(), rm.key(), naturalEndpoints);
                 int blockFor = determineBlockFor(naturalEndpoints.size(), endpointMap.size(), consistency_level);
                 
                 // avoid starting a write we know can't achieve the required consistency
                 assureSufficientLiveNodes(endpointMap, blockFor, consistency_level);
                 
                 // send out the writes, as in insert() above, but this time with a callback that tracks responses
-                final WriteResponseHandler responseHandler = StorageService.instance.getWriteResponseHandler(blockFor, consistency_level);
+                final WriteResponseHandler responseHandler = StorageService.instance.getWriteResponseHandler(blockFor, consistency_level, rm.getTable());
                 responseHandlers.add(responseHandler);
                 Message unhintedMessage = null;
                 for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
@@ -337,7 +337,7 @@
 
         for (ReadCommand command: commands)
         {
-            InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.key);
+            InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key);
             Message message = command.makeReadMessage();
 
             if (logger.isDebugEnabled())
@@ -376,7 +376,7 @@
 
             for (ReadCommand command: commands)
             {
-                List<InetAddress> endpoints = StorageService.instance.getNaturalEndpoints(command.key);
+                List<InetAddress> endpoints = StorageService.instance.getNaturalEndpoints(command.table, command.key);
                 boolean foundLocal = endpoints.contains(FBUtilities.getLocalAddress());
                 //TODO: Throw InvalidRequest if we're in bootstrap mode?
                 if (foundLocal && !StorageService.instance.isBootstrapMode())
@@ -423,7 +423,6 @@
         List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
         List<Row> rows = new ArrayList<Row>();
 
-        int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
         int commandIndex = 0;
 
         for (ReadCommand command: commands)
@@ -434,8 +433,10 @@
             Message message = command.makeReadMessage();
             Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
 
-            InetAddress dataPoint = StorageService.instance.findSuitableEndPoint(command.key);
-            List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.key);
+            InetAddress dataPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key);
+            List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+            final String table = command.table;
+            int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), DatabaseDescriptor.getReplicationFactor(table), consistency_level);
             if (endpointList.size() < responseCount)
                 throw new UnavailableException();
 
@@ -452,7 +453,7 @@
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
-            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table, responseCount));
+            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(command.table), new ReadResponseResolver(command.table, responseCount));
             MessagingService.instance.sendRR(messages, endPoints, quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndPoints.add(endPoints);
@@ -476,9 +477,9 @@
             {
                 if (DatabaseDescriptor.getConsistencyCheck())
                 {
-                    IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum());
+                    IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum(command.table));
                     QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
-                            DatabaseDescriptor.getQuorum(),
+                            DatabaseDescriptor.getQuorum(command.table),
                             readResponseResolverRepair);
                     logger.info("DigestMismatchException: " + ex.getMessage());
                     Message messageRepair = command.makeReadMessage();
@@ -539,13 +540,14 @@
 
         InetAddress endPoint = StorageService.instance.getPrimary(command.startKey.token);
         InetAddress startEndpoint = endPoint;
-        int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
+        final String table = command.keyspace;
+        int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), DatabaseDescriptor.getReplicationFactor(table), consistency_level);
 
         Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(command.max_keys);
         do
         {
             Range primaryRange = StorageService.instance.getPrimaryRangeForEndPoint(endPoint);
-            List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(primaryRange.right);
+            List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, primaryRange.right);
             if (endpoints.size() < responseCount)
                 throw new UnavailableException();
 
@@ -666,7 +668,7 @@
 
         public Object call() throws IOException
         {
-            List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.key);
+            List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
             /* Remove the local storage endpoint from the list. */
             endpoints.remove(FBUtilities.getLocalAddress());