You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/01/18 18:26:38 UTC
svn commit: r900469 - in
/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra:
config/DatabaseDescriptor.java dht/BootStrapper.java io/Streaming.java
service/StorageService.java
Author: gdusbabek
Date: Mon Jan 18 17:26:38 2010
New Revision: 900469
URL: http://svn.apache.org/viewvc?rev=900469&view=rev
Log:
ensure bootstrap works in multi keyspace environments and with empty keyspaces. Patch by Gary Dusbabek, reviewed by Jaakko Laine. CASSANDRA-673
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=900469&r1=900468&r2=900469&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Jan 18 17:26:38 2010
@@ -763,6 +763,13 @@
return tables_;
}
+ public static List<String> getNonSystemTables()
+ {
+ List<String> tables = new ArrayList<String>(tables_);
+ tables.remove(Table.SYSTEM_TABLE);
+ return Collections.unmodifiableList(tables);
+ }
+
public static String getTable(String tableName)
{
assert tableName != null;
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=900469&r1=900468&r2=900469&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Jan 18 17:26:38 2010
@@ -87,7 +87,8 @@
for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
{
InetAddress source = entry.getKey();
- StorageService.instance().addBootstrapSource(source);
+ for (String table : DatabaseDescriptor.getNonSystemTables())
+ StorageService.instance().addBootstrapSource(source, table);
if (logger.isDebugEnabled())
logger.debug("Requesting from " + source + " ranges " + StringUtils.join(entry.getValue(), ", "));
Streaming.requestRanges(source, entry.getValue());
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java?rev=900469&r1=900468&r2=900469&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/Streaming.java Mon Jan 18 17:26:38 2010
@@ -50,6 +50,7 @@
public class Streaming
{
private static Logger logger = Logger.getLogger(Streaming.class);
+ private static String TABLE_NAME = "STREAMING-TABLE-NAME";
public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
/**
@@ -126,6 +127,7 @@
StreamManager.instance(target).addFilesToStream(streamContexts);
StreamInitiateMessage biMessage = new StreamInitiateMessage(streamContexts);
Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
+ message.addHeader(Streaming.TABLE_NAME, table.getBytes());
if (logger.isDebugEnabled())
logger.debug("Sending a stream initiate message to " + target + " ...");
MessagingService.instance().sendOneWay(message, target);
@@ -163,6 +165,8 @@
byte[] body = message.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("StreamInitiateVerbeHandler.doVerb %s %s %s", message.getVerb(), message.getMessageId(), message.getMessageType()));
try
{
@@ -173,7 +177,7 @@
{
if (logger.isDebugEnabled())
logger.debug("no data needed from " + message.getFrom());
- StorageService.instance().removeBootstrapSource(message.getFrom());
+ StorageService.instance().removeBootstrapSource(message.getFrom(), new String(message.getHeader(Streaming.TABLE_NAME)));
return;
}
@@ -315,7 +319,7 @@
/* If we're done with everything for this host, remove from bootstrap sources */
if (StreamContextManager.isDone(host) && StorageService.instance().isBootstrapMode())
{
- StorageService.instance().removeBootstrapSource(host);
+ StorageService.instance().removeBootstrapSource(host, streamContext.getTable());
}
}
}
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java?rev=900469&r1=900468&r2=900469&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java Mon Jan 18 17:26:38 2010
@@ -158,22 +158,25 @@
private AbstractReplicationStrategy replicationStrategy_;
/* Are we starting this node in bootstrap mode? */
private boolean isBootstrapMode;
- private Set<InetAddress> bootstrapSet;
+ private Multimap<InetAddress, String> bootstrapSet;
/* when intialized as a client, we shouldn't write to the system table. */
private boolean isClientMode;
- public synchronized void addBootstrapSource(InetAddress s)
+ public synchronized void addBootstrapSource(InetAddress s, String table)
{
if (logger_.isDebugEnabled())
logger_.debug("Added " + s + " as a bootstrap source");
- bootstrapSet.add(s);
+ bootstrapSet.put(s, table);
}
- public synchronized void removeBootstrapSource(InetAddress s)
+ public synchronized void removeBootstrapSource(InetAddress s, String table)
{
- bootstrapSet.remove(s);
+ if (table == null)
+ bootstrapSet.removeAll(s);
+ else
+ bootstrapSet.remove(s, table);
if (logger_.isDebugEnabled())
- logger_.debug("Removed " + s + " as a bootstrap source; remaining is [" + StringUtils.join(bootstrapSet, ", ") + "]");
+ logger_.debug("Removed " + s + " as a bootstrap source; remaining is [" + StringUtils.join(bootstrapSet.keySet(), ", ") + "]");
if (bootstrapSet.isEmpty())
{
@@ -211,7 +214,7 @@
throw new RuntimeException(e);
}
- bootstrapSet = new HashSet<InetAddress>();
+ bootstrapSet = HashMultimap.create();
endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
/* register the verb handlers */