You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/11 17:11:45 UTC

svn commit: r834939 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/net/io/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/dht/

Author: jbellis
Date: Wed Nov 11 16:11:44 2009
New Revision: 834939

URL: http://svn.apache.org/viewvc?rev=834939&view=rev
Log:
move more generic streaming code into Streaming.java
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-435

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
      - copied, changed from r834937, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Nov 11 16:11:44 2009
@@ -57,9 +57,9 @@
   *
   *  - bootstrapTokenVerb asks the most-loaded node what Token to use to split its Range in two.
   *  - bootstrapMetadataVerb tells source nodes to send us the necessary Ranges
-  *  - source nodes send bootStrapInitiateVerb to us to say "get ready to receive data" [if there is data to send]
-  *  - when we have everything set up to receive the data, we send bootStrapInitiateDoneVerb back to the source nodes and they start streaming
-  *  - when streaming is complete, we send bootStrapTerminateVerb to the source so it can clean up on its end
+  *  - source nodes send streamInitiateVerb to us to say "get ready to receive data" [if there is data to send]
+  *  - when we have everything set up to receive the data, we send streamInitiateDoneVerb back to the source nodes and they start streaming
+  *  - when streaming is complete, we send streamFinishedVerb to the source so it can clean up on its end
   */
 public class BootStrapper
 {
@@ -241,17 +241,6 @@
         }
     }
 
-    public static class BootstrapInitiateDoneVerbHandler implements IVerbHandler
-    {
-        public void doVerb(Message message)
-        {
-            if (logger.isDebugEnabled())
-              logger.debug("Received a bootstrap initiate done message ...");
-            /* Let the Stream Manager do his thing. */
-            StreamManager.instance(message.getFrom()).start();
-        }
-    }
-
     private static class BootstrapTokenCallback implements IAsyncCallback
     {
         private volatile Token<?> token;
@@ -283,194 +272,4 @@
             condition.signalAll();
         }
     }
-
-    public static class BootStrapInitiateVerbHandler implements IVerbHandler
-    {
-        /*
-         * Here we handle the BootstrapInitiateMessage. Here we get the
-         * array of StreamContexts. We get file names for the column
-         * families associated with the files and replace them with the
-         * file names as obtained from the column family store on the
-         * receiving end.
-        */
-        public void doVerb(Message message)
-        {
-            byte[] body = message.getMessageBody();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-
-            try
-            {
-                BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
-                StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
-
-                Map<String, String> fileNames = getNewNames(streamContexts);
-                /*
-                 * For each of stream context's in the incoming message
-                 * generate the new file names and store the new file names
-                 * in the StreamContextManager.
-                */
-                for (StreamContextManager.StreamContext streamContext : streamContexts )
-                {
-                    StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
-                    String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
-
-                    //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
-                    if (logger.isDebugEnabled())
-                      logger.debug("Received Data from  : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
-                    streamContext.setTargetFile(file);
-                    addStreamContext(message.getFrom(), streamContext, streamStatus);
-                }
-
-                StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new BootstrapCompletionHandler());
-                /* Send a bootstrap initiation done message to execute on default stage. */
-                if (logger.isDebugEnabled())
-                  logger.debug("Sending a bootstrap initiate done message ...");
-                Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
-                MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
-            }
-            catch ( IOException ex )
-            {
-                logger.info(LogUtil.throwableToString(ex));
-            }
-        }
-
-        String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
-                StreamContextManager.StreamContext streamContext)
-        {
-            File sourceFile = new File( streamContext.getTargetFile() );
-            String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
-            String cfName = piece[0];
-            String ssTableNum = piece[1];
-            String typeOfFile = piece[2];
-
-            String newFileNameExpanded = fileNames.get( streamContext.getTable() + "-" + cfName + "-" + ssTableNum );
-            //Drop type (Data.db) from new FileName
-            String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
-            return DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName;
-        }
-
-        Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
-        {
-            /*
-             * Mapping for each file with unique CF-i ---> new file name. For eg.
-             * for a file with name <CF>-<i>-Data.db there is a corresponding
-             * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
-             * generated file name.
-            */
-            Map<String, String> fileNames = new HashMap<String, String>();
-            /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
-            Set<String> distinctEntries = new HashSet<String>();
-            for ( StreamContextManager.StreamContext streamContext : streamContexts )
-            {
-                String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
-                distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
-            }
-
-            /* Generate unique file names per entry */
-            for ( String distinctEntry : distinctEntries )
-            {
-                String tableName;
-                String[] pieces = FBUtilities.strip(distinctEntry, "-");
-                tableName = pieces[0];
-                Table table = Table.open( tableName );
-
-                ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
-                if (logger.isDebugEnabled())
-                  logger.debug("Generating file name for " + distinctEntry + " ...");
-                fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
-            }
-
-            return fileNames;
-        }
-
-        private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
-        {
-            if (logger.isDebugEnabled())
-              logger.debug("Adding stream context " + streamContext + " for " + host + " ...");
-            StreamContextManager.addStreamContext(host, streamContext, streamStatus);
-        }
-    }
-
-    /**
-     * This is the callback handler that is invoked when we have
-     * completely been bootstrapped for a single file by a remote host.
-     *
-     * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
-    */
-    private static class BootstrapCompletionHandler implements IStreamComplete
-    {
-        public void onStreamCompletion(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
-        {
-            /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
-            if (streamContext.getTargetFile().contains("-Data.db"))
-            {
-                String tableName = streamContext.getTable();
-                File file = new File( streamContext.getTargetFile() );
-                String fileName = file.getName();
-                String [] temp = fileName.split("-");
-
-                //Open the file to see if all parts are now here
-                try
-                {
-                    SSTableReader sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
-                    //TODO add a sanity check that this sstable has all its parts and is ok
-                    Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
-                    logger.info("Bootstrap added " + sstable.getFilename());
-                }
-                catch (IOException e)
-                {
-                    logger.error("Not able to bootstrap with file " + streamContext.getTargetFile(), e);
-                }
-            }
-
-            if (logger.isDebugEnabled())
-              logger.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + host);
-            /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
-            StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
-            Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
-            MessagingService.instance().sendOneWay(message, host);
-            /* If we're done with everything for this host, remove from bootstrap sources */
-            if (StreamContextManager.isDone(host))
-                StorageService.instance().removeBootstrapSource(host);
-        }
-    }
-
-    public static class BootstrapTerminateVerbHandler implements IVerbHandler
-    {
-        private static Logger logger_ = Logger.getLogger( BootstrapTerminateVerbHandler.class );
-
-        public void doVerb(Message message)
-        {
-            byte[] body = message.getMessageBody();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-
-            try
-            {
-                StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
-                StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
-
-                switch( streamStatus.getAction() )
-                {
-                    case DELETE:
-                        StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
-                        break;
-
-                    case STREAM:
-                        if (logger_.isDebugEnabled())
-                          logger_.debug("Need to re-stream file " + streamStatus.getFile());
-                        StreamManager.instance(message.getFrom()).repeat();
-                        break;
-
-                    default:
-                        break;
-                }
-            }
-            catch ( IOException ex )
-            {
-                logger_.info(LogUtil.throwableToString(ex));
-            }
-        }
-    }
 }

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java (from r834937, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java&r1=834937&r2=834939&rev=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java Wed Nov 11 16:11:44 2009
@@ -30,31 +30,31 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class BootstrapInitiateMessage implements Serializable
+public class StreamInitiateMessage implements Serializable
 {
-    private static ICompactSerializer<BootstrapInitiateMessage> serializer_;
-    
+    private static ICompactSerializer<StreamInitiateMessage> serializer_;
+
     static
     {
-        serializer_ = new BootstrapInitiateMessageSerializer();
+        serializer_ = new StreamInitiateMessageSerializer();
     }
     
-    public static ICompactSerializer<BootstrapInitiateMessage> serializer()
+    public static ICompactSerializer<StreamInitiateMessage> serializer()
     {
         return serializer_;
     }
     
-    public static Message makeBootstrapInitiateMessage(BootstrapInitiateMessage biMessage) throws IOException
+    public static Message makeStreamInitiateMessage(StreamInitiateMessage biMessage) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
-        return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
+        StreamInitiateMessage.serializer().serialize(biMessage, dos);
+        return new Message(FBUtilities.getLocalAddress(), "", StorageService.streamInitiateVerbHandler_, bos.toByteArray() );
     }
     
     protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
    
-    public BootstrapInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
+    public StreamInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
     {
         streamContexts_ = streamContexts;
     }
@@ -65,9 +65,9 @@
     }
 }
 
-class BootstrapInitiateMessageSerializer implements ICompactSerializer<BootstrapInitiateMessage>
+class StreamInitiateMessageSerializer implements ICompactSerializer<StreamInitiateMessage>
 {
-    public void serialize(BootstrapInitiateMessage bim, DataOutputStream dos) throws IOException
+    public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
     {
         dos.writeInt(bim.streamContexts_.length);
         for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
@@ -76,7 +76,7 @@
         }
     }
     
-    public BootstrapInitiateMessage deserialize(DataInputStream dis) throws IOException
+    public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
     {
         int size = dis.readInt();
         StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
@@ -88,7 +88,7 @@
                 streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
             }
         }
-        return new BootstrapInitiateMessage(streamContexts);
+        return new StreamInitiateMessage(streamContexts);
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Nov 11 16:11:44 2009
@@ -1,32 +1,35 @@
 package org.apache.cassandra.io;
 
 import java.net.InetAddress;
-import java.util.Collection;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.*;
 import java.io.IOException;
 import java.io.File;
+import java.io.IOError;
 
 import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.dht.StreamInitiateMessage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.net.io.IStreamComplete;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.service.StreamManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
 
 public class Streaming
 {
     private static Logger logger = Logger.getLogger(Streaming.class);
 
-    /*
-     * This method needs to figure out the files on disk
-     * locally for each range and then stream them using
-     * the Bootstrap protocol to the target endpoint.
+    /**
+     * split out files on disk locally for each range and then stream them to the target endpoint
     */
     public static void transferRanges(InetAddress target, Collection<Range> ranges) throws IOException
     {
@@ -36,12 +39,9 @@
             logger.debug("Beginning transfer process to " + target + " for ranges " + StringUtils.join(ranges, ", "));
 
         /*
-         * (1) First we dump all the memtables to disk.
-         * (2) Run a version of compaction which will basically
-         *     put the keys in the range specified into a directory
-         *     named as per the endpoint it is destined for inside the
-         *     bootstrap directory.
-         * (3) Handoff the data.
+         * (1) dump all the memtables to disk.
+         * (2) anticompaction -- split out the keys in the range specified
+         * (3) transfer the data.
         */
         List<String> tables = DatabaseDescriptor.getTables();
         for (String tName : tables)
@@ -62,10 +62,6 @@
         }
     }
 
-    /**
-     * Stream the files in the bootstrap directory over to the
-     * node being bootstrapped.
-    */
     private static void transferOneTable(InetAddress target, List<String> fileList, String table) throws IOException
     {
         if (fileList.isEmpty())
@@ -81,13 +77,11 @@
               logger.debug("Stream context metadata " + streamContexts[i]);
         }
 
-        /* Set up the stream manager with the files that need to streamed */
         StreamManager.instance(target).addFilesToStream(streamContexts);
-        /* Send the bootstrap initiate message */
-        BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(streamContexts);
-        Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
+        StreamInitiateMessage biMessage = new StreamInitiateMessage(streamContexts);
+        Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
         if (logger.isDebugEnabled())
-          logger.debug("Sending a bootstrap initiate message to " + target + " ...");
+          logger.debug("Sending a stream initiate message to " + target + " ...");
         MessagingService.instance().sendOneWay(message, target);
         if (logger.isDebugEnabled())
           logger.debug("Waiting for transfer to " + target + " to complete");
@@ -95,4 +89,202 @@
         if (logger.isDebugEnabled())
           logger.debug("Done with transfer to " + target);
     }
+
+    public static class StreamInitiateVerbHandler implements IVerbHandler
+    {
+        /*
+         * Here we handle the StreamInitiateMessage. Here we get the
+         * array of StreamContexts. We get file names for the column
+         * families associated with the files and replace them with the
+         * file names as obtained from the column family store on the
+         * receiving end.
+        */
+        public void doVerb(Message message)
+        {
+            byte[] body = message.getMessageBody();
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+
+            try
+            {
+                StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(bufIn);
+                StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
+
+                Map<String, String> fileNames = getNewNames(streamContexts);
+                /*
+                 * For each of stream context's in the incoming message
+                 * generate the new file names and store the new file names
+                 * in the StreamContextManager.
+                */
+                for (StreamContextManager.StreamContext streamContext : streamContexts )
+                {
+                    StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
+                    String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
+
+                    //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
+                    if (logger.isDebugEnabled())
+                      logger.debug("Received Data from  : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
+                    streamContext.setTargetFile(file);
+                    addStreamContext(message.getFrom(), streamContext, streamStatus);
+                }
+
+                StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
+                if (logger.isDebugEnabled())
+                  logger.debug("Sending a stream initiate done message ...");
+                Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.streamInitiateDoneVerbHandler_, new byte[0] );
+                MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
+            }
+            catch ( IOException ex )
+            {
+                logger.info(LogUtil.throwableToString(ex));
+            }
+        }
+
+        public String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
+                StreamContextManager.StreamContext streamContext)
+        {
+            File sourceFile = new File( streamContext.getTargetFile() );
+            String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
+            String cfName = piece[0];
+            String ssTableNum = piece[1];
+            String typeOfFile = piece[2];
+
+            String newFileNameExpanded = fileNames.get( streamContext.getTable() + "-" + cfName + "-" + ssTableNum );
+            //Drop type (Data.db) from new FileName
+            String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
+            return DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName;
+        }
+
+        public Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
+        {
+            /*
+             * Mapping for each file with unique CF-i ---> new file name. For eg.
+             * for a file with name <CF>-<i>-Data.db there is a corresponding
+             * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+             * generated file name.
+            */
+            Map<String, String> fileNames = new HashMap<String, String>();
+            /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
+            Set<String> distinctEntries = new HashSet<String>();
+            for ( StreamContextManager.StreamContext streamContext : streamContexts )
+            {
+                String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
+                distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
+            }
+
+            /* Generate unique file names per entry */
+            for ( String distinctEntry : distinctEntries )
+            {
+                String tableName;
+                String[] pieces = FBUtilities.strip(distinctEntry, "-");
+                tableName = pieces[0];
+                Table table = Table.open( tableName );
+
+                ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
+                if (logger.isDebugEnabled())
+                  logger.debug("Generating file name for " + distinctEntry + " ...");
+                fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
+            }
+
+            return fileNames;
+        }
+
+        private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+        {
+            if (logger.isDebugEnabled())
+              logger.debug("Adding stream context " + streamContext + " for " + host + " ...");
+            StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+        }
+    }
+
+    public static class StreamInitiateDoneVerbHandler implements IVerbHandler
+    {
+        public void doVerb(Message message)
+        {
+            if (logger.isDebugEnabled())
+              logger.debug("Received a stream initiate done message ...");
+            StreamManager.instance(message.getFrom()).start();
+        }
+    }
+
+    /**
+     * This is the callback handler that is invoked when we have
+     * completely received a single file from a remote host.
+     *
+     * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
+    */
+    private static class StreamCompletionHandler implements IStreamComplete
+    {
+        public void onStreamCompletion(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+        {
+            /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
+            if (streamContext.getTargetFile().contains("-Data.db"))
+            {
+                String tableName = streamContext.getTable();
+                File file = new File( streamContext.getTargetFile() );
+                String fileName = file.getName();
+                String [] temp = fileName.split("-");
+
+                //Open the file to see if all parts are now here
+                try
+                {
+                    SSTableReader sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+                    //TODO add a sanity check that this sstable has all its parts and is ok
+                    Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
+                    logger.info("Streaming added " + sstable.getFilename());
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException("Not able to add streamed file " + streamContext.getTargetFile(), e);
+                }
+            }
+
+            if (logger.isDebugEnabled())
+              logger.debug("Sending a streaming finished message with " + streamStatus + " to " + host);
+            /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
+            StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
+            Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+            MessagingService.instance().sendOneWay(message, host);
+
+            /* If we're done with everything for this host, remove from bootstrap sources */
+            if (StorageService.instance().isBootstrapMode() && StreamContextManager.isDone(host))
+                StorageService.instance().removeBootstrapSource(host);
+        }
+    }
+
+    public static class StreamFinishedVerbHandler implements IVerbHandler
+    {
+        public void doVerb(Message message)
+        {
+            byte[] body = message.getMessageBody();
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+
+            try
+            {
+                StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
+                StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+
+                switch (streamStatus.getAction())
+                {
+                    case DELETE:
+                        StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+                        break;
+
+                    case STREAM:
+                        if (logger.isDebugEnabled())
+                            logger.debug("Need to re-stream file " + streamStatus.getFile());
+                        StreamManager.instance(message.getFrom()).repeat();
+                        break;
+
+                    default:
+                        break;
+                }
+            }
+            catch (IOException ex)
+            {
+                throw new IOError(ex);
+            }
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Wed Nov 11 16:11:44 2009
@@ -220,7 +220,7 @@
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream( bos );
             StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
-            return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
+            return new Message(FBUtilities.getLocalAddress(), "", StorageService.streamFinishedVerbHandler_, bos.toByteArray());
         }
         
         protected StreamContextManager.StreamStatus streamStatus_;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Nov 11 16:11:44 2009
@@ -39,6 +39,7 @@
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.Streaming;
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
@@ -68,9 +69,9 @@
     public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
     public final static String readRepairVerbHandler_ = "READ-REPAIR-VERB-HANDLER";
     public final static String readVerbHandler_ = "ROW-READ-VERB-HANDLER";
-    public final static String bootStrapInitiateVerbHandler_ = "BOOTSTRAP-INITIATE-VERB-HANDLER";
-    public final static String bootStrapInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
-    public final static String bootStrapTerminateVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
+    public final static String streamInitiateVerbHandler_ = "BOOTSTRAP-INITIATE-VERB-HANDLER";
+    public final static String streamInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
+    public final static String streamFinishedVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
     public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
     public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
@@ -219,9 +220,9 @@
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
         MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
-        MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_, new BootStrapper.BootStrapInitiateVerbHandler());
-        MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_, new BootStrapper.BootstrapInitiateDoneVerbHandler());
-        MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_, new BootStrapper.BootstrapTerminateVerbHandler());
+        MessagingService.instance().registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
+        MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
+        MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
 
         StageManager.registerStage(StorageService.mutationStage_,
                                    new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?rev=834939&r1=834938&r2=834939&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java Wed Nov 11 16:11:44 2009
@@ -23,16 +23,10 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.dht.*;
-import java.net.InetAddress;
 import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.Streaming;
 
 import org.junit.Test;
 
@@ -45,15 +39,15 @@
         streamContexts[0] = new StreamContextManager.StreamContext("/foo/Standard1-500-Data.db", 100, "Keyspace1");
         streamContexts[1] = new StreamContextManager.StreamContext("/foo/Standard1-500-Index.db", 100, "Keyspace1");
         streamContexts[2] = new StreamContextManager.StreamContext("/foo/Standard1-500-Filter.db", 100, "Keyspace1");
-        BootStrapper.BootStrapInitiateVerbHandler bivh = new BootStrapper.BootStrapInitiateVerbHandler();
+        Streaming.StreamInitiateVerbHandler bivh = new Streaming.StreamInitiateVerbHandler();
         Map<String, String> fileNames = bivh.getNewNames(streamContexts);
         String result = fileNames.get("Keyspace1-Standard1-500");
         assertEquals(true, result.contains("Standard1"));
         assertEquals(true, result.contains("Data.db"));
         assertEquals(1, fileNames.entrySet().size());
-        
-        assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
-        assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
-        assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
+
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
     }
 }