You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/11 17:11:45 UTC
svn commit: r834939 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/io/
src/java/org/apache/cassandra/net/io/
src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/dht/
Author: jbellis
Date: Wed Nov 11 16:11:44 2009
New Revision: 834939
URL: http://svn.apache.org/viewvc?rev=834939&view=rev
Log:
move more generic streaming code into Streaming.java
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-435
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
- copied, changed from r834937, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Nov 11 16:11:44 2009
@@ -57,9 +57,9 @@
*
* - bootstrapTokenVerb asks the most-loaded node what Token to use to split its Range in two.
* - bootstrapMetadataVerb tells source nodes to send us the necessary Ranges
- * - source nodes send bootStrapInitiateVerb to us to say "get ready to receive data" [if there is data to send]
- * - when we have everything set up to receive the data, we send bootStrapInitiateDoneVerb back to the source nodes and they start streaming
- * - when streaming is complete, we send bootStrapTerminateVerb to the source so it can clean up on its end
+ * - source nodes send streamInitiateVerb to us to say "get ready to receive data" [if there is data to send]
+ * - when we have everything set up to receive the data, we send streamInitiateDoneVerb back to the source nodes and they start streaming
+ * - when streaming is complete, we send streamFinishedVerb to the source so it can clean up on its end
*/
public class BootStrapper
{
@@ -241,17 +241,6 @@
}
}
- public static class BootstrapInitiateDoneVerbHandler implements IVerbHandler
- {
- public void doVerb(Message message)
- {
- if (logger.isDebugEnabled())
- logger.debug("Received a bootstrap initiate done message ...");
- /* Let the Stream Manager do his thing. */
- StreamManager.instance(message.getFrom()).start();
- }
- }
-
private static class BootstrapTokenCallback implements IAsyncCallback
{
private volatile Token<?> token;
@@ -283,194 +272,4 @@
condition.signalAll();
}
}
-
- public static class BootStrapInitiateVerbHandler implements IVerbHandler
- {
- /*
- * Here we handle the BootstrapInitiateMessage. Here we get the
- * array of StreamContexts. We get file names for the column
- * families associated with the files and replace them with the
- * file names as obtained from the column family store on the
- * receiving end.
- */
- public void doVerb(Message message)
- {
- byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
-
- try
- {
- BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
- StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
-
- Map<String, String> fileNames = getNewNames(streamContexts);
- /*
- * For each of stream context's in the incoming message
- * generate the new file names and store the new file names
- * in the StreamContextManager.
- */
- for (StreamContextManager.StreamContext streamContext : streamContexts )
- {
- StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
- String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
-
- //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
- if (logger.isDebugEnabled())
- logger.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
- streamContext.setTargetFile(file);
- addStreamContext(message.getFrom(), streamContext, streamStatus);
- }
-
- StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new BootstrapCompletionHandler());
- /* Send a bootstrap initiation done message to execute on default stage. */
- if (logger.isDebugEnabled())
- logger.debug("Sending a bootstrap initiate done message ...");
- Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
- MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
- }
- catch ( IOException ex )
- {
- logger.info(LogUtil.throwableToString(ex));
- }
- }
-
- String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
- StreamContextManager.StreamContext streamContext)
- {
- File sourceFile = new File( streamContext.getTargetFile() );
- String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
- String cfName = piece[0];
- String ssTableNum = piece[1];
- String typeOfFile = piece[2];
-
- String newFileNameExpanded = fileNames.get( streamContext.getTable() + "-" + cfName + "-" + ssTableNum );
- //Drop type (Data.db) from new FileName
- String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
- return DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName;
- }
-
- Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
- {
- /*
- * Mapping for each file with unique CF-i ---> new file name. For eg.
- * for a file with name <CF>-<i>-Data.db there is a corresponding
- * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
- * generated file name.
- */
- Map<String, String> fileNames = new HashMap<String, String>();
- /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
- Set<String> distinctEntries = new HashSet<String>();
- for ( StreamContextManager.StreamContext streamContext : streamContexts )
- {
- String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
- distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
- }
-
- /* Generate unique file names per entry */
- for ( String distinctEntry : distinctEntries )
- {
- String tableName;
- String[] pieces = FBUtilities.strip(distinctEntry, "-");
- tableName = pieces[0];
- Table table = Table.open( tableName );
-
- ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
- if (logger.isDebugEnabled())
- logger.debug("Generating file name for " + distinctEntry + " ...");
- fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
- }
-
- return fileNames;
- }
-
- private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
- {
- if (logger.isDebugEnabled())
- logger.debug("Adding stream context " + streamContext + " for " + host + " ...");
- StreamContextManager.addStreamContext(host, streamContext, streamStatus);
- }
- }
-
- /**
- * This is the callback handler that is invoked when we have
- * completely been bootstrapped for a single file by a remote host.
- *
- * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
- */
- private static class BootstrapCompletionHandler implements IStreamComplete
- {
- public void onStreamCompletion(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
- {
- /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
- if (streamContext.getTargetFile().contains("-Data.db"))
- {
- String tableName = streamContext.getTable();
- File file = new File( streamContext.getTargetFile() );
- String fileName = file.getName();
- String [] temp = fileName.split("-");
-
- //Open the file to see if all parts are now here
- try
- {
- SSTableReader sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
- //TODO add a sanity check that this sstable has all its parts and is ok
- Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
- logger.info("Bootstrap added " + sstable.getFilename());
- }
- catch (IOException e)
- {
- logger.error("Not able to bootstrap with file " + streamContext.getTargetFile(), e);
- }
- }
-
- if (logger.isDebugEnabled())
- logger.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + host);
- /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
- StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
- Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
- MessagingService.instance().sendOneWay(message, host);
- /* If we're done with everything for this host, remove from bootstrap sources */
- if (StreamContextManager.isDone(host))
- StorageService.instance().removeBootstrapSource(host);
- }
- }
-
- public static class BootstrapTerminateVerbHandler implements IVerbHandler
- {
- private static Logger logger_ = Logger.getLogger( BootstrapTerminateVerbHandler.class );
-
- public void doVerb(Message message)
- {
- byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
-
- try
- {
- StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
- StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
-
- switch( streamStatus.getAction() )
- {
- case DELETE:
- StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
- break;
-
- case STREAM:
- if (logger_.isDebugEnabled())
- logger_.debug("Need to re-stream file " + streamStatus.getFile());
- StreamManager.instance(message.getFrom()).repeat();
- break;
-
- default:
- break;
- }
- }
- catch ( IOException ex )
- {
- logger_.info(LogUtil.throwableToString(ex));
- }
- }
- }
}
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java (from r834937, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java&r1=834937&r2=834939&rev=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java Wed Nov 11 16:11:44 2009
@@ -30,31 +30,31 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public class BootstrapInitiateMessage implements Serializable
+public class StreamInitiateMessage implements Serializable
{
- private static ICompactSerializer<BootstrapInitiateMessage> serializer_;
-
+ private static ICompactSerializer<StreamInitiateMessage> serializer_;
+
static
{
- serializer_ = new BootstrapInitiateMessageSerializer();
+ serializer_ = new StreamInitiateMessageSerializer();
}
- public static ICompactSerializer<BootstrapInitiateMessage> serializer()
+ public static ICompactSerializer<StreamInitiateMessage> serializer()
{
return serializer_;
}
- public static Message makeBootstrapInitiateMessage(BootstrapInitiateMessage biMessage) throws IOException
+ public static Message makeStreamInitiateMessage(StreamInitiateMessage biMessage) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
- return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
+ StreamInitiateMessage.serializer().serialize(biMessage, dos);
+ return new Message(FBUtilities.getLocalAddress(), "", StorageService.streamInitiateVerbHandler_, bos.toByteArray() );
}
protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
- public BootstrapInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
+ public StreamInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
{
streamContexts_ = streamContexts;
}
@@ -65,9 +65,9 @@
}
}
-class BootstrapInitiateMessageSerializer implements ICompactSerializer<BootstrapInitiateMessage>
+class StreamInitiateMessageSerializer implements ICompactSerializer<StreamInitiateMessage>
{
- public void serialize(BootstrapInitiateMessage bim, DataOutputStream dos) throws IOException
+ public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
{
dos.writeInt(bim.streamContexts_.length);
for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
@@ -76,7 +76,7 @@
}
}
- public BootstrapInitiateMessage deserialize(DataInputStream dis) throws IOException
+ public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
{
int size = dis.readInt();
StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
@@ -88,7 +88,7 @@
streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
}
}
- return new BootstrapInitiateMessage(streamContexts);
+ return new StreamInitiateMessage(streamContexts);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Nov 11 16:11:44 2009
@@ -1,32 +1,35 @@
package org.apache.cassandra.io;
import java.net.InetAddress;
-import java.util.Collection;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.*;
import java.io.IOException;
import java.io.File;
+import java.io.IOError;
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.dht.StreamInitiateMessage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.net.io.IStreamComplete;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.service.StreamManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
public class Streaming
{
private static Logger logger = Logger.getLogger(Streaming.class);
- /*
- * This method needs to figure out the files on disk
- * locally for each range and then stream them using
- * the Bootstrap protocol to the target endpoint.
+ /**
+ * split out files on disk locally for each range and then stream them to the target endpoint
*/
public static void transferRanges(InetAddress target, Collection<Range> ranges) throws IOException
{
@@ -36,12 +39,9 @@
logger.debug("Beginning transfer process to " + target + " for ranges " + StringUtils.join(ranges, ", "));
/*
- * (1) First we dump all the memtables to disk.
- * (2) Run a version of compaction which will basically
- * put the keys in the range specified into a directory
- * named as per the endpoint it is destined for inside the
- * bootstrap directory.
- * (3) Handoff the data.
+ * (1) dump all the memtables to disk.
+ * (2) anticompaction -- split out the keys in the range specified
+ * (3) transfer the data.
*/
List<String> tables = DatabaseDescriptor.getTables();
for (String tName : tables)
@@ -62,10 +62,6 @@
}
}
- /**
- * Stream the files in the bootstrap directory over to the
- * node being bootstrapped.
- */
private static void transferOneTable(InetAddress target, List<String> fileList, String table) throws IOException
{
if (fileList.isEmpty())
@@ -81,13 +77,11 @@
logger.debug("Stream context metadata " + streamContexts[i]);
}
- /* Set up the stream manager with the files that need to streamed */
StreamManager.instance(target).addFilesToStream(streamContexts);
- /* Send the bootstrap initiate message */
- BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(streamContexts);
- Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
+ StreamInitiateMessage biMessage = new StreamInitiateMessage(streamContexts);
+ Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
if (logger.isDebugEnabled())
- logger.debug("Sending a bootstrap initiate message to " + target + " ...");
+ logger.debug("Sending a stream initiate message to " + target + " ...");
MessagingService.instance().sendOneWay(message, target);
if (logger.isDebugEnabled())
logger.debug("Waiting for transfer to " + target + " to complete");
@@ -95,4 +89,202 @@
if (logger.isDebugEnabled())
logger.debug("Done with transfer to " + target);
}
+
+ public static class StreamInitiateVerbHandler implements IVerbHandler
+ {
+ /*
+ * Here we handle the StreamInitiateMessage. Here we get the
+ * array of StreamContexts. We get file names for the column
+ * families associated with the files and replace them with the
+ * file names as obtained from the column family store on the
+ * receiving end.
+ */
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+
+ try
+ {
+ StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(bufIn);
+ StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
+
+ Map<String, String> fileNames = getNewNames(streamContexts);
+ /*
+ * For each of stream context's in the incoming message
+ * generate the new file names and store the new file names
+ * in the StreamContextManager.
+ */
+ for (StreamContextManager.StreamContext streamContext : streamContexts )
+ {
+ StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
+ String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
+
+ //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
+ if (logger.isDebugEnabled())
+ logger.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
+ streamContext.setTargetFile(file);
+ addStreamContext(message.getFrom(), streamContext, streamStatus);
+ }
+
+ StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a stream initiate done message ...");
+ Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.streamInitiateDoneVerbHandler_, new byte[0] );
+ MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
+ }
+ catch ( IOException ex )
+ {
+ logger.info(LogUtil.throwableToString(ex));
+ }
+ }
+
+ public String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
+ StreamContextManager.StreamContext streamContext)
+ {
+ File sourceFile = new File( streamContext.getTargetFile() );
+ String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
+ String cfName = piece[0];
+ String ssTableNum = piece[1];
+ String typeOfFile = piece[2];
+
+ String newFileNameExpanded = fileNames.get( streamContext.getTable() + "-" + cfName + "-" + ssTableNum );
+ //Drop type (Data.db) from new FileName
+ String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
+ return DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName;
+ }
+
+ public Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
+ {
+ /*
+ * Mapping for each file with unique CF-i ---> new file name. For eg.
+ * for a file with name <CF>-<i>-Data.db there is a corresponding
+ * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+ * generated file name.
+ */
+ Map<String, String> fileNames = new HashMap<String, String>();
+ /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
+ Set<String> distinctEntries = new HashSet<String>();
+ for ( StreamContextManager.StreamContext streamContext : streamContexts )
+ {
+ String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
+ distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
+ }
+
+ /* Generate unique file names per entry */
+ for ( String distinctEntry : distinctEntries )
+ {
+ String tableName;
+ String[] pieces = FBUtilities.strip(distinctEntry, "-");
+ tableName = pieces[0];
+ Table table = Table.open( tableName );
+
+ ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
+ if (logger.isDebugEnabled())
+ logger.debug("Generating file name for " + distinctEntry + " ...");
+ fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
+ }
+
+ return fileNames;
+ }
+
+ private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Adding stream context " + streamContext + " for " + host + " ...");
+ StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+ }
+ }
+
+ public static class StreamInitiateDoneVerbHandler implements IVerbHandler
+ {
+ public void doVerb(Message message)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Received a stream initiate done message ...");
+ StreamManager.instance(message.getFrom()).start();
+ }
+ }
+
+ /**
+ * This is the callback handler that is invoked when we have
+ * completely received a single file from a remote host.
+ *
+ * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
+ */
+ private static class StreamCompletionHandler implements IStreamComplete
+ {
+ public void onStreamCompletion(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+ {
+ /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
+ if (streamContext.getTargetFile().contains("-Data.db"))
+ {
+ String tableName = streamContext.getTable();
+ File file = new File( streamContext.getTargetFile() );
+ String fileName = file.getName();
+ String [] temp = fileName.split("-");
+
+ //Open the file to see if all parts are now here
+ try
+ {
+ SSTableReader sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+ //TODO add a sanity check that this sstable has all its parts and is ok
+ Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
+ logger.info("Streaming added " + sstable.getFilename());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Not able to add streamed file " + streamContext.getTargetFile(), e);
+ }
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a streaming finished message with " + streamStatus + " to " + host);
+ /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
+ StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
+ Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+ MessagingService.instance().sendOneWay(message, host);
+
+ /* If we're done with everything for this host, remove from bootstrap sources */
+ if (StorageService.instance().isBootstrapMode() && StreamContextManager.isDone(host))
+ StorageService.instance().removeBootstrapSource(host);
+ }
+ }
+
+ public static class StreamFinishedVerbHandler implements IVerbHandler
+ {
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+
+ try
+ {
+ StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
+ StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+
+ switch (streamStatus.getAction())
+ {
+ case DELETE:
+ StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+ break;
+
+ case STREAM:
+ if (logger.isDebugEnabled())
+ logger.debug("Need to re-stream file " + streamStatus.getFile());
+ StreamManager.instance(message.getFrom()).repeat();
+ break;
+
+ default:
+ break;
+ }
+ }
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
+ }
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Wed Nov 11 16:11:44 2009
@@ -220,7 +220,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
- return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), "", StorageService.streamFinishedVerbHandler_, bos.toByteArray());
}
protected StreamContextManager.StreamStatus streamStatus_;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Nov 11 16:11:44 2009
@@ -39,6 +39,7 @@
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.Streaming;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
@@ -68,9 +69,9 @@
public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
public final static String readRepairVerbHandler_ = "READ-REPAIR-VERB-HANDLER";
public final static String readVerbHandler_ = "ROW-READ-VERB-HANDLER";
- public final static String bootStrapInitiateVerbHandler_ = "BOOTSTRAP-INITIATE-VERB-HANDLER";
- public final static String bootStrapInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
- public final static String bootStrapTerminateVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
+ public final static String streamInitiateVerbHandler_ = "BOOTSTRAP-INITIATE-VERB-HANDLER";
+ public final static String streamInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
+ public final static String streamFinishedVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
@@ -219,9 +220,9 @@
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
- MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_, new BootStrapper.BootStrapInitiateVerbHandler());
- MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_, new BootStrapper.BootstrapInitiateDoneVerbHandler());
- MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_, new BootStrapper.BootstrapTerminateVerbHandler());
+ MessagingService.instance().registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
+ MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
+ MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
StageManager.registerStage(StorageService.mutationStage_,
new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java Wed Nov 11 16:11:44 2009
@@ -23,16 +23,10 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.dht.*;
-import java.net.InetAddress;
import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.Streaming;
import org.junit.Test;
@@ -45,15 +39,15 @@
streamContexts[0] = new StreamContextManager.StreamContext("/foo/Standard1-500-Data.db", 100, "Keyspace1");
streamContexts[1] = new StreamContextManager.StreamContext("/foo/Standard1-500-Index.db", 100, "Keyspace1");
streamContexts[2] = new StreamContextManager.StreamContext("/foo/Standard1-500-Filter.db", 100, "Keyspace1");
- BootStrapper.BootStrapInitiateVerbHandler bivh = new BootStrapper.BootStrapInitiateVerbHandler();
+ Streaming.StreamInitiateVerbHandler bivh = new Streaming.StreamInitiateVerbHandler();
Map<String, String> fileNames = bivh.getNewNames(streamContexts);
String result = fileNames.get("Keyspace1-Standard1-500");
assertEquals(true, result.contains("Standard1"));
assertEquals(true, result.contains("Data.db"));
assertEquals(1, fileNames.entrySet().size());
-
- assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
- assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
- assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
+
+ assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
+ assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
+ assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
}
}