You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/01/18 18:26:38 UTC

svn commit: r900469 - in /incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra: config/DatabaseDescriptor.java dht/BootStrapper.java io/Streaming.java service/StorageService.java

Author: gdusbabek
Date: Mon Jan 18 17:26:38 2010
New Revision: 900469

URL: http://svn.apache.org/viewvc?rev=900469&view=rev
Log:
ensure bootstrap works in multi keyspace environments and with empty keyspaces. Patch by Gary Dusbabek, reviewed by Jaakko Laine. CASSANDRA-673

Modified:
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=900469&r1=900468&r2=900469&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Jan 18 17:26:38 2010
@@ -763,6 +763,13 @@
         return tables_;
     }
 
+    public static List<String> getNonSystemTables()
+    {
+        List<String> tables = new ArrayList<String>(tables_);
+        tables.remove(Table.SYSTEM_TABLE);
+        return Collections.unmodifiableList(tables);
+    }
+
     public static String getTable(String tableName)
     {
         assert tableName != null;

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=900469&r1=900468&r2=900469&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Jan 18 17:26:38 2010
@@ -87,7 +87,8 @@
                 for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
                 {
                     InetAddress source = entry.getKey();
-                    StorageService.instance().addBootstrapSource(source);
+                    for (String table : DatabaseDescriptor.getNonSystemTables())
+                        StorageService.instance().addBootstrapSource(source, table);
                     if (logger.isDebugEnabled())
                         logger.debug("Requesting from " + source + " ranges " + StringUtils.join(entry.getValue(), ", "));
                     Streaming.requestRanges(source, entry.getValue());

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java?rev=900469&r1=900468&r2=900469&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java Mon Jan 18 17:26:38 2010
@@ -50,6 +50,7 @@
 public class Streaming
 {
     private static Logger logger = Logger.getLogger(Streaming.class);
+    private static String TABLE_NAME = "STREAMING-TABLE-NAME";
     public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
 
     /**
@@ -126,6 +127,7 @@
         StreamManager.instance(target).addFilesToStream(streamContexts);
         StreamInitiateMessage biMessage = new StreamInitiateMessage(streamContexts);
         Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
+        message.addHeader(Streaming.TABLE_NAME, table.getBytes());
         if (logger.isDebugEnabled())
           logger.debug("Sending a stream initiate message to " + target + " ...");
         MessagingService.instance().sendOneWay(message, target);
@@ -163,6 +165,8 @@
             byte[] body = message.getMessageBody();
             DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
+            if (logger.isDebugEnabled())
+                logger.debug(String.format("StreamInitiateVerbeHandler.doVerb %s %s %s", message.getVerb(), message.getMessageId(), message.getMessageType()));
 
             try
             {
@@ -173,7 +177,7 @@
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("no data needed from " + message.getFrom());
-                    StorageService.instance().removeBootstrapSource(message.getFrom());
+                    StorageService.instance().removeBootstrapSource(message.getFrom(), new String(message.getHeader(Streaming.TABLE_NAME)));
                     return;
                 }
 
@@ -315,7 +319,7 @@
             /* If we're done with everything for this host, remove from bootstrap sources */
             if (StreamContextManager.isDone(host) && StorageService.instance().isBootstrapMode())
             {
-                StorageService.instance().removeBootstrapSource(host);
+                StorageService.instance().removeBootstrapSource(host, streamContext.getTable());
             }
         }
     }

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java?rev=900469&r1=900468&r2=900469&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java Mon Jan 18 17:26:38 2010
@@ -158,22 +158,25 @@
     private AbstractReplicationStrategy replicationStrategy_;
     /* Are we starting this node in bootstrap mode? */
     private boolean isBootstrapMode;
-    private Set<InetAddress> bootstrapSet;
+    private Multimap<InetAddress, String> bootstrapSet;
     /* when intialized as a client, we shouldn't write to the system table. */
     private boolean isClientMode;
   
-    public synchronized void addBootstrapSource(InetAddress s)
+    public synchronized void addBootstrapSource(InetAddress s, String table)
     {
         if (logger_.isDebugEnabled())
             logger_.debug("Added " + s + " as a bootstrap source");
-        bootstrapSet.add(s);
+        bootstrapSet.put(s, table);
     }
     
-    public synchronized void removeBootstrapSource(InetAddress s)
+    public synchronized void removeBootstrapSource(InetAddress s, String table)
     {
-        bootstrapSet.remove(s);
+        if (table == null)
+            bootstrapSet.removeAll(s);
+        else
+            bootstrapSet.remove(s, table);
         if (logger_.isDebugEnabled())
-            logger_.debug("Removed " + s + " as a bootstrap source; remaining is [" + StringUtils.join(bootstrapSet, ", ") + "]");
+            logger_.debug("Removed " + s + " as a bootstrap source; remaining is [" + StringUtils.join(bootstrapSet.keySet(), ", ") + "]");
 
         if (bootstrapSet.isEmpty())
         {
@@ -211,7 +214,7 @@
             throw new RuntimeException(e);
         }
 
-        bootstrapSet = new HashSet<InetAddress>();
+        bootstrapSet = HashMultimap.create();
         endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
 
         /* register the verb handlers */