You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/09 19:07:15 UTC

svn commit: r823616 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/...

Author: jbellis
Date: Fri Oct  9 17:07:14 2009
New Revision: 823616

URL: http://svn.apache.org/viewvc?rev=823616&view=rev
Log:
move bootstrap-related code into Bootstrapper
patch by jbellis; reviewed by Eric Evans for CASSANDRA-477

Added:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
      - copied, changed from r823562, incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
Removed:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Oct  9 17:07:14 2009
@@ -340,7 +340,7 @@
         return new File(DatabaseDescriptor.getDataFileLocationForTable(table_), fname).getAbsolutePath();
     }
 
-    String getTempSSTableFileName()
+    public String getTempSSTableFileName()
     {
         return String.format("%s-%s-%s-Data.db",
                              columnFamily_, SSTable.TEMPFILE_MARKER, fileIndexGenerator_.incrementAndGet());
@@ -559,7 +559,7 @@
      * param @ filename - filename just flushed to disk
      * param @ bf - bloom filter which indicates the keys that are in this file.
     */
-    void addSSTable(SSTableReader sstable)
+    public void addSSTable(SSTableReader sstable)
     {
         ssTables_.add(sstable);
         CompactionManager.instance().submit(this);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Oct  9 17:07:14 2009
@@ -26,14 +26,11 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.BootstrapInitiateMessage;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.SSTableWriter;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.io.IStreamComplete;
@@ -155,163 +152,6 @@
             return FBUtilities.mapToString(tableMetadataMap_);
         }
     }
-
-    /**
-     * This is the callback handler that is invoked when we have
-     * completely been bootstrapped for a single file by a remote host.
-     *
-     * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
-    */
-    public static class BootstrapCompletionHandler implements IStreamComplete
-    {                
-        public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
-        {                        
-            /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
-            if (streamContext.getTargetFile().contains("-Data.db"))
-            {
-                String tableName = streamContext.getTable();
-                File file = new File( streamContext.getTargetFile() );
-                String fileName = file.getName();
-                String [] temp = fileName.split("-");
-                
-                //Open the file to see if all parts are now here
-                SSTableReader sstable = null;
-                try 
-                {
-                    sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
-                    
-                    //TODO add a sanity check that this sstable has all its parts and is ok
-                    Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
-                    logger_.info("Bootstrap added " + sstable.getFilename());
-                }
-                catch (IOException e)
-                {
-                    logger_.error("Not able to bootstrap with file " + streamContext.getTargetFile(), e);                    
-                }
-            }
-            
-            EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
-            if (logger_.isDebugEnabled())
-              logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
-            /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
-            StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
-            Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
-            MessagingService.instance().sendOneWay(message, to);
-            /* If we're done with everything for this host, remove from bootstrap sources */
-            if (StreamContextManager.isDone(to.getHost()))
-                StorageService.instance().removeBootstrapSource(to);
-        }
-    }
-
-    public static class BootStrapInitiateVerbHandler implements IVerbHandler
-    {
-        /*
-         * Here we handle the BootstrapInitiateMessage. Here we get the
-         * array of StreamContexts. We get file names for the column
-         * families associated with the files and replace them with the
-         * file names as obtained from the column family store on the
-         * receiving end.
-        */
-        public void doVerb(Message message)
-        {
-            byte[] body = message.getMessageBody();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length); 
-            
-            try
-            {
-                BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
-                StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();                
-                
-                Map<String, String> fileNames = getNewNames(streamContexts);
-                /*
-                 * For each of stream context's in the incoming message
-                 * generate the new file names and store the new file names
-                 * in the StreamContextManager.
-                */
-                for (StreamContextManager.StreamContext streamContext : streamContexts )
-                {                    
-                    StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
-                    String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
-                    
-                    //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
-                    if (logger_.isDebugEnabled())
-                      logger_.debug("Received Data from  : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
-                    streamContext.setTargetFile(file);
-                    addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);                                            
-                }    
-                                             
-                StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new Table.BootstrapCompletionHandler());
-                /* Send a bootstrap initiation done message to execute on default stage. */
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Sending a bootstrap initiate done message ...");
-                Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
-                MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
-            }
-            catch ( IOException ex )
-            {
-                logger_.info(LogUtil.throwableToString(ex));
-            }
-        }
-        
-        String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
-                StreamContextManager.StreamContext streamContext)
-        {
-            File sourceFile = new File( streamContext.getTargetFile() );
-            String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
-            String cfName = piece[0];
-            String ssTableNum = piece[1];
-            String typeOfFile = piece[2];             
-
-            String newFileNameExpanded = fileNames.get( streamContext.getTable() + "-" + cfName + "-" + ssTableNum );
-            //Drop type (Data.db) from new FileName
-            String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
-            String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName ;
-            return file;
-        }
-
-        Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
-        {
-            /* 
-             * Mapping for each file with unique CF-i ---> new file name. For eg.
-             * for a file with name <CF>-<i>-Data.db there is a corresponding
-             * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
-             * generated file name.
-            */
-            Map<String, String> fileNames = new HashMap<String, String>();
-            /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
-            Set<String> distinctEntries = new HashSet<String>();
-            for ( StreamContextManager.StreamContext streamContext : streamContexts )
-            {
-                String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
-                distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
-            }
-            
-            /* Generate unique file names per entry */
-            for ( String distinctEntry : distinctEntries )
-            {
-                String tableName;
-                String[] peices = FBUtilities.strip(distinctEntry, "-");
-                tableName = peices[0];
-                Table table = Table.open( tableName );
-                Map<String, ColumnFamilyStore> columnFamilyStores = table.getColumnFamilyStores();
-
-                ColumnFamilyStore cfStore = columnFamilyStores.get(peices[1]);
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Generating file name for " + distinctEntry + " ...");
-                fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
-            }
-            
-            return fileNames;
-        }
-
-        private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
-            StreamContextManager.addStreamContext(host, streamContext, streamStatus);
-        }
-    }
     
     /* Used to lock the factory for creation of Table instance */
     private static final Lock createLock_ = new ReentrantLock();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Fri Oct  9 17:07:14 2009
@@ -26,24 +26,54 @@
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
+ import java.util.concurrent.locks.Condition;
+ import java.util.concurrent.ExecutorService;
+ import java.io.IOException;
+ import java.io.UnsupportedEncodingException;
+ import java.io.File;
 
  import org.apache.log4j.Logger;
 
+ import org.apache.commons.lang.ArrayUtils;
+
  import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.net.*;
+ import org.apache.cassandra.net.io.StreamContextManager;
+ import org.apache.cassandra.net.io.IStreamComplete;
  import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-
-
-/**
- * This class handles the bootstrapping responsibilities for
- * any new endpoint.
-*/
+ import org.apache.cassandra.service.StorageLoadBalancer;
+ import org.apache.cassandra.service.StreamManager;
+ import org.apache.cassandra.utils.LogUtil;
+ import org.apache.cassandra.utils.SimpleCondition;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.gms.Gossiper;
+ import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+ import org.apache.cassandra.io.DataInputBuffer;
+ import org.apache.cassandra.io.SSTableReader;
+ import org.apache.cassandra.io.SSTableWriter;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Table;
+
+
+ /**
+  * This class handles the bootstrapping responsibilities for the local endpoint.
+  *
+  *  - bootstrapTokenVerb asks the most-loaded node what Token to use to split its Range in two.
+  *  - bootstrapMetadataVerb tells source nodes to send us the necessary Ranges
+  *  - source nodes send bootStrapInitiateVerb to us to say "get ready to receive data" [if there is data to send]
+  *  - when we have everything set up to receive the data, we send bootStrapInitiateDoneVerb back to the source nodes and they start streaming
+  *  - when streaming is complete, we send bootStrapTerminateVerb to the source so it can clean up on its end
+  */
 public class BootStrapper implements Runnable
 {
     public static final long INITIAL_DELAY = 30 * 1000; //ms
 
-    private static Logger logger_ = Logger.getLogger(BootStrapper.class);
+    static final Logger logger_ = Logger.getLogger(BootStrapper.class);
+
+    /* This thread pool is used to do the bootstrap for a new node */
+    private static final ExecutorService bootstrapExecutor_ = new DebuggableThreadPoolExecutor("BOOT-STRAPPER");
 
     /* endpoints that need to be bootstrapped */
     protected EndPoint[] targets_ = new EndPoint[0];
@@ -141,4 +171,312 @@
         return rangesWithSourceTarget;
     }
 
+    private static Token<?> getBootstrapTokenFrom(EndPoint maxEndpoint)
+    {
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
+        BootstrapTokenCallback btc = new BootstrapTokenCallback();
+        MessagingService.instance().sendRR(message, maxEndpoint, btc);
+        return btc.getToken();
+    }
+
+    public static void startBootstrap() throws IOException
+    {
+        logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
+
+        StorageService ss = StorageService.instance();
+        StorageLoadBalancer slb = StorageLoadBalancer.instance();
+
+        slb.waitForLoadInfo();
+
+        // if initialtoken was specified, use that.  otherwise, pick a token to assume half the load of the most-loaded node.
+        if (DatabaseDescriptor.getInitialToken() == null)
+        {
+            double maxLoad = 0;
+            EndPoint maxEndpoint = null;
+            for (Map.Entry<EndPoint,Double> entry : slb.getLoadInfo().entrySet())
+            {
+                if (maxEndpoint == null || entry.getValue() > maxLoad)
+                {
+                    maxEndpoint = entry.getKey();
+                    maxLoad = entry.getValue();
+                }
+            }
+            if (maxEndpoint == null)
+            {
+                throw new RuntimeException("No bootstrap sources found");
+            }
+
+            if (!maxEndpoint.equals(StorageService.getLocalStorageEndPoint()))
+            {
+                Token<?> t = getBootstrapTokenFrom(maxEndpoint);
+                logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint.getHost());
+                ss.updateToken(t);
+            }
+        }
+
+        BootStrapper bs = new BootStrapper(new EndPoint[] {StorageService.getLocalStorageEndPoint()}, ss.getLocalToken());
+        bootstrapExecutor_.submit(bs);
+        Gossiper.instance().addApplicationState(StorageService.BOOTSTRAP_MODE, new ApplicationState(""));
+    }
+
+    public static class BootstrapTokenVerbHandler implements IVerbHandler
+    {
+        public void doVerb(Message message)
+        {
+            StorageService ss = StorageService.instance();
+            List<String> tokens = ss.getSplits(2);
+            assert tokens.size() == 3 : tokens.size();
+            Message response;
+            try
+            {
+                response = message.getReply(ss.getLocalStorageEndPoint(), tokens.get(1).getBytes("UTF-8"));
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                throw new AssertionError();
+            }
+            MessagingService.instance().sendOneWay(response, message.getFrom());
+        }
+    }
+
+    public static class BootstrapInitiateDoneVerbHandler implements IVerbHandler
+    {
+        public void doVerb(Message message)
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Received a bootstrap initiate done message ...");
+            /* Let the Stream Manager do his thing. */
+            StreamManager.instance(message.getFrom()).start();
+        }
+    }
+
+    private static class BootstrapTokenCallback implements IAsyncCallback
+    {
+        private volatile Token<?> token;
+        private final Condition condition = new SimpleCondition();
+
+        public Token<?> getToken()
+        {
+            try
+            {
+                condition.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            return token;
+        }
+
+        public void response(Message msg)
+        {
+            try
+            {
+                token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), "UTF-8"));
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                throw new AssertionError();
+            }
+            condition.signalAll();
+        }
+
+        public void attachContext(Object o)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public static class BootStrapInitiateVerbHandler implements IVerbHandler
+    {
+        /*
+         * Here we handle the BootstrapInitiateMessage. Here we get the
+         * array of StreamContexts. We get file names for the column
+         * families associated with the files and replace them with the
+         * file names as obtained from the column family store on the
+         * receiving end.
+        */
+        public void doVerb(Message message)
+        {
+            byte[] body = message.getMessageBody();
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+
+            try
+            {
+                BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
+                StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
+
+                Map<String, String> fileNames = getNewNames(streamContexts);
+                /*
+                 * For each of stream context's in the incoming message
+                 * generate the new file names and store the new file names
+                 * in the StreamContextManager.
+                */
+                for (StreamContextManager.StreamContext streamContext : streamContexts )
+                {
+                    StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
+                    String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
+
+                    //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
+                    if (logger_.isDebugEnabled())
+                      logger_.debug("Received Data from  : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
+                    streamContext.setTargetFile(file);
+                    addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);
+                }
+
+                StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new BootstrapCompletionHandler());
+                /* Send a bootstrap initiation done message to execute on default stage. */
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Sending a bootstrap initiate done message ...");
+                Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
+                MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
+            }
+            catch ( IOException ex )
+            {
+                logger_.info(LogUtil.throwableToString(ex));
+            }
+        }
+
+        String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
+                StreamContextManager.StreamContext streamContext)
+        {
+            File sourceFile = new File( streamContext.getTargetFile() );
+            String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
+            String cfName = piece[0];
+            String ssTableNum = piece[1];
+            String typeOfFile = piece[2];
+
+            String newFileNameExpanded = fileNames.get( streamContext.getTable() + "-" + cfName + "-" + ssTableNum );
+            //Drop type (Data.db) from new FileName
+            String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
+            return DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName;
+        }
+
+        Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
+        {
+            /*
+             * Mapping for each file with unique CF-i ---> new file name. For eg.
+             * for a file with name <CF>-<i>-Data.db there is a corresponding
+             * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+             * generated file name.
+            */
+            Map<String, String> fileNames = new HashMap<String, String>();
+            /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
+            Set<String> distinctEntries = new HashSet<String>();
+            for ( StreamContextManager.StreamContext streamContext : streamContexts )
+            {
+                String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
+                distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
+            }
+
+            /* Generate unique file names per entry */
+            for ( String distinctEntry : distinctEntries )
+            {
+                String tableName;
+                String[] pieces = FBUtilities.strip(distinctEntry, "-");
+                tableName = pieces[0];
+                Table table = Table.open( tableName );
+
+                ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Generating file name for " + distinctEntry + " ...");
+                fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
+            }
+
+            return fileNames;
+        }
+
+        private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
+            StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+        }
+    }
+
+    /**
+     * This is the callback handler that is invoked when we have
+     * completely been bootstrapped for a single file by a remote host.
+     *
+     * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
+    */
+    private static class BootstrapCompletionHandler implements IStreamComplete
+    {
+        public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+        {
+            /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
+            if (streamContext.getTargetFile().contains("-Data.db"))
+            {
+                String tableName = streamContext.getTable();
+                File file = new File( streamContext.getTargetFile() );
+                String fileName = file.getName();
+                String [] temp = fileName.split("-");
+
+                //Open the file to see if all parts are now here
+                SSTableReader sstable = null;
+                try
+                {
+                    sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+
+                    //TODO add a sanity check that this sstable has all its parts and is ok
+                    Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
+                    logger_.info("Bootstrap added " + sstable.getFilename());
+                }
+                catch (IOException e)
+                {
+                    logger_.error("Not able to bootstrap with file " + streamContext.getTargetFile(), e);
+                }
+            }
+
+            EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
+            if (logger_.isDebugEnabled())
+              logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
+            /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
+            StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
+            Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+            MessagingService.instance().sendOneWay(message, to);
+            /* If we're done with everything for this host, remove from bootstrap sources */
+            if (StreamContextManager.isDone(to.getHost()))
+                StorageService.instance().removeBootstrapSource(to);
+        }
+    }
+
+    public static class BootstrapTerminateVerbHandler implements IVerbHandler
+    {
+        private static Logger logger_ = Logger.getLogger( BootstrapTerminateVerbHandler.class );
+
+        public void doVerb(Message message)
+        {
+            byte[] body = message.getMessageBody();
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+
+            try
+            {
+                StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
+                StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+
+                switch( streamStatus.getAction() )
+                {
+                    case DELETE:
+                        StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+                        break;
+
+                    case STREAM:
+                        if (logger_.isDebugEnabled())
+                          logger_.debug("Need to re-stream file " + streamStatus.getFile());
+                        StreamManager.instance(message.getFrom()).repeat();
+                        break;
+
+                    default:
+                        break;
+                }
+            }
+            catch ( IOException ex )
+            {
+                logger_.info(LogUtil.throwableToString(ex));
+            }
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Fri Oct  9 17:07:14 2009
@@ -53,7 +53,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
-        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, bos.toByteArray() );
+        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
     }        
     
     protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Fri Oct  9 17:07:14 2009
@@ -164,18 +164,6 @@
         sendMessagesToBootstrapSources(rangeInfo);
     }
     
-    // TODO: Once we're sure we don't need global bootstrap -- clean this code up 
-    // so it is easier to understand what messages are being sent. Local bootstrap should
-    // look much simpler
-    protected static void assignWorkForLocalBootstrap(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
-    {
-        Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = getWorkMap(rangesWithSourceTarget);
-        Map<EndPoint, Map<EndPoint, List<Range>>> filteredRanges = filterRangesForTargetEndPoint(rangeInfo,
-                                                                                                 StorageService.getLocalStorageEndPoint());
-        sendMessagesToBootstrapSources(filteredRanges);
-    }
-
-    
     /**
      * This method takes the Src -> (Tgt-> List of ranges) maps and retains those entries 
      * that are relevant to bootstrapping the target endpoint

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Oct  9 17:07:14 2009
@@ -130,8 +130,7 @@
     /* map where key is the endpoint and value is the state associated with the endpoint */
     Map<EndPoint, EndPointState> endPointStateMap_ = new Hashtable<EndPoint, EndPointState>();
 
-    /* private CTOR */
-    Gossiper()
+    private Gossiper()
     {
         aVeryLongTime_ = 259200 * 1000;
         /* register with the Failure Detector for receiving Failure detector events */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Fri Oct  9 17:07:14 2009
@@ -29,6 +29,7 @@
 import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndPointState;
 import org.apache.cassandra.gms.Gossiper;
@@ -48,7 +49,7 @@
  * keys at an Endpoint. Monitor load information for a 5 minute
  * interval and then do load balancing operations if necessary.
  */
-final class StorageLoadBalancer implements IEndPointStateChangeSubscriber
+public final class StorageLoadBalancer implements IEndPointStateChangeSubscriber
 {
     class LoadBalancer implements Runnable
     {
@@ -70,14 +71,14 @@
             /*
             int threshold = (int)(StorageLoadBalancer.TOPHEAVY_RATIO * averageSystemLoad());
             int myLoad = localLoad();            
-            EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+            EndPoint predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
             if (logger_.isDebugEnabled())
               logger_.debug("Trying to relocate the predecessor " + predecessor);
             boolean value = tryThisNode(myLoad, threshold, predecessor);
             if ( !value )
             {
                 loadInfo2_.remove(predecessor);
-                EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+                EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
                 if (logger_.isDebugEnabled())
                   logger_.debug("Trying to relocate the successor " + successor);
                 value = tryThisNode(myLoad, threshold, successor);
@@ -165,6 +166,16 @@
         }
     }
 
+    private static final long BROADCAST_INTERVAL = 5 * 60 * 1000L;
+
+    private static StorageLoadBalancer instance_;
+
+    public static synchronized StorageLoadBalancer instance()
+    {
+        return instance_ == null ? (instance_ = new StorageLoadBalancer()) : instance_;
+    }
+
+
     private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
     private static final String lbStage_ = "LOAD-BALANCER-STAGE";
     private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
@@ -173,7 +184,6 @@
     /* If a node's load is this factor more than the average, it is considered Heavy */
     private static final double TOPHEAVY_RATIO = 1.5;
 
-    private StorageService storageService_;
     /* this indicates whether this node is already helping someone else */
     private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
     private Map<EndPoint, Double> loadInfo_ = new HashMap<EndPoint, Double>();
@@ -184,18 +194,13 @@
     /* This thread pool is used by target node to leave the ring. */
     private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor("LB-TARGET");
 
-    StorageLoadBalancer(StorageService storageService)
+    /* Timer is used to disseminate load information */
+    private Timer loadTimer_ = new Timer(false);
+
+    private StorageLoadBalancer()
     {
-        storageService_ = storageService;
-        /* register the load balancer stage */
         StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
-        /* register the load balancer verb handler */
         MessagingService.instance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
-    }
-
-    public void start()
-    {
-        /* Register with the Gossiper for EndPointState notifications */
         Gossiper.instance().register(this);
     }
 
@@ -227,7 +232,7 @@
         if ( !isMoveable_.get() )
             return false;
         int myload = localLoad();
-        EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+        EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
         LoadInfo li = loadInfo2_.get(successor);
         // "load" is NULL means that the successor node has not
         // yet gossiped its load information. We should return
@@ -290,7 +295,7 @@
         }
         else
         {
-            EndPoint successor = storageService_.getSuccessor(target);
+            EndPoint successor = StorageService.instance().getSuccessor(target);
             double sLoad = loadInfo2_.get(successor);
             double targetLoad = loadInfo2_.get(target);
             return (sLoad + targetLoad) <= threshold;
@@ -299,11 +304,11 @@
 
     private boolean isANeighbour(EndPoint neighbour)
     {
-        EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+        EndPoint predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
         if ( predecessor.equals(neighbour) )
             return true;
 
-        EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+        EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
         if ( successor.equals(neighbour) )
             return true;
 
@@ -343,6 +348,31 @@
     {
         return loadInfo_;
     }
+
+    public void startBroadcasting()
+    {
+        /* starts a load timer thread */
+        loadTimer_.schedule(new LoadDisseminator(), BROADCAST_INTERVAL, BROADCAST_INTERVAL);
+    }
+
+    /** wait for node information to be available.  if the rest of the cluster just came up,
+        this could be up to threshold_ ms (currently 5 minutes). */
+    public void waitForLoadInfo()
+    {
+        try
+        {
+            while (loadInfo_.isEmpty())
+            {
+                Thread.sleep(100);
+            }
+            // one more sleep in case there are some stragglers
+            Thread.sleep(BootStrapper.INITIAL_DELAY);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
 }
 
 class MoveMessage implements Serializable

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Oct  9 17:07:14 2009
@@ -22,15 +22,11 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import javax.management.*;
 
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -43,12 +39,10 @@
 import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.utils.SimpleCondition;
 import org.apache.cassandra.io.SSTableReader;
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
-import org.apache.commons.lang.ArrayUtils;
 
 /*
  * This abstraction contains the token/identifier of this node
@@ -60,9 +54,7 @@
 {
     private static Logger logger_ = Logger.getLogger(StorageService.class);     
     private final static String nodeId_ = "NODE-IDENTIFIER";
-    private final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
-    /* Gossip load after every 5 mins. */
-    private static final long threshold_ = 5 * 60 * 1000L;
+    public final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
     
     /* All stage identifiers */
     public final static String mutationStage_ = "ROW-MUTATION-STAGE";
@@ -79,14 +71,16 @@
     public final static String bootStrapTerminateVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
     public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
     public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
-    public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
+    public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
     public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
 
-    private static volatile StorageService instance_;
     private static EndPoint tcpAddr_;
     private static EndPoint udpAddr_;
-    private static IPartitioner partitioner_;
+    private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
+
+
+    private static volatile StorageService instance_;
 
     public static EndPoint getLocalStorageEndPoint()
     {
@@ -112,25 +106,6 @@
         return getPrimaryRangeForEndPoint(getLocalStorageEndPoint());
     }
 
-    static
-    {
-        partitioner_ = DatabaseDescriptor.getPartitioner();
-    }
-
-
-    public static class BootstrapInitiateDoneVerbHandler implements IVerbHandler
-    {
-        private static Logger logger_ = Logger.getLogger( BootstrapInitiateDoneVerbHandler.class );
-
-        public void doVerb(Message message)
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Received a bootstrap initiate done message ...");
-            /* Let the Stream Manager do his thing. */
-            StreamManager.instance(message.getFrom()).start();            
-        }
-    }
-
     /*
      * Factory method that gets an instance of the StorageService
      * class.
@@ -170,17 +145,14 @@
     private TokenMetadata tokenMetadata_ = new TokenMetadata();
     private SystemTable.StorageMetadata storageMetadata_;
 
-    /* Timer is used to disseminate load information */
-    private Timer loadTimer_ = new Timer(false);
-
-    /* This thread pool is used to do the bootstrap for a new node */
-    private ExecutorService bootStrapper_ = new DebuggableThreadPoolExecutor("BOOT-STRAPPER");
-    
     /* This thread pool does consistency checks when the client doesn't care about consistency */
-    private ExecutorService consistencyManager_;
+    private ExecutorService consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
+                                                                                   DatabaseDescriptor.getConsistencyThreads(),
+                                                                                   Integer.MAX_VALUE,
+                                                                                   TimeUnit.SECONDS,
+                                                                                   new LinkedBlockingQueue<Runnable>(),
+                                                                                   new NamedThreadFactory("CONSISTENCY-MANAGER"));
 
-    /* This is the entity that tracks load information of all nodes in the cluster */
-    private StorageLoadBalancer storageLoadBalancer_;
     /* We use this interface to determine where replicas need to be placed */
     private AbstractReplicationStrategy nodePicker_;
     /* Are we starting this node in bootstrap mode? */
@@ -229,28 +201,19 @@
         }
     }
 
-    /*
-     * Registers with Management Server
-     */
-    private void init()
+    public StorageService()
     {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, new ObjectName(
-                    "org.apache.cassandra.service:type=StorageService"));
+            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.service:type=StorageService"));
         }
         catch (Exception e)
         {
-            logger_.error(LogUtil.throwableToString(e));
+            throw new RuntimeException(e);
         }
-    }
 
-    public StorageService()
-    {
         bootstrapSet = new HashSet<EndPoint>();
-        init();
-        storageLoadBalancer_ = new StorageLoadBalancer(this);
         endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
 
         /* register the verb handlers */
@@ -259,37 +222,15 @@
         MessagingService.instance().registerVerbHandlers(mutationVerbHandler_, new RowMutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
         MessagingService.instance().registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
-        MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_, new Table.BootStrapInitiateVerbHandler());
-        MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_, new StorageService.BootstrapInitiateDoneVerbHandler());
-        MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_, new StreamManager.BootstrapTerminateVerbHandler());
         MessagingService.instance().registerVerbHandlers(dataFileVerbHandler_, new DataFileVerbHandler() );
         MessagingService.instance().registerVerbHandlers(mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
-        MessagingService.instance().registerVerbHandlers(bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
         MessagingService.instance().registerVerbHandlers(rangeVerbHandler_, new RangeVerbHandler());
-        MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new IVerbHandler()
-        {
-            public void doVerb(Message message)
-            {
-                List<String> tokens = getSplits(2);
-                assert tokens.size() == 3 : tokens.size();
-                Message response;
-                try
-                {
-                    response = message.getReply(getLocalStorageEndPoint(), tokens.get(1).getBytes("UTF-8"));
-                }
-                catch (UnsupportedEncodingException e)
-                {
-                    throw new AssertionError();
-                }
-                MessagingService.instance().sendOneWay(response, message.getFrom());
-            }
-        });
-        
-        /* register the stage for the mutations */
-        consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
-                                                               DatabaseDescriptor.getConsistencyThreads(),
-                                                               Integer.MAX_VALUE, TimeUnit.SECONDS,
-                                                               new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CONSISTENCY-MANAGER"));
+        // see BootStrapper for a summary of how the bootstrap verbs interact
+        MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
+        MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_, new BootStrapper.BootStrapInitiateVerbHandler());
+        MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_, new BootStrapper.BootstrapInitiateDoneVerbHandler());
+        MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_, new BootStrapper.BootstrapTerminateVerbHandler());
+        MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
         
         StageManager.registerStage(StorageService.mutationStage_,
                                    new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
@@ -323,55 +264,13 @@
         SelectorManager.getSelectorManager().start();
         SelectorManager.getUdpSelectorManager().start();
 
-        /* starts a load timer thread */
-        loadTimer_.schedule( new LoadDisseminator(), StorageService.threshold_, StorageService.threshold_);
-        
+        StorageLoadBalancer.instance().startBroadcasting();
+
         if (isBootstrapMode)
         {
-            logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
-            // wait for node information to be available.  if the rest of the cluster just came up,
-            // this could be up to threshold_ ms (currently 5 minutes).
-            try
-            {
-                while (storageLoadBalancer_.getLoadInfo().isEmpty())
-                {
-                    Thread.sleep(100);
-                }
-                // one more sleep in case there are some stragglers
-                Thread.sleep(BootStrapper.INITIAL_DELAY);
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-
-            // if initialtoken was specified, use that.  otherwise, pick a token to assume half the load of the most-loaded node.
-            if (DatabaseDescriptor.getInitialToken() == null)
-            {
-                double maxLoad = 0;
-                EndPoint maxEndpoint = null;
-                for (Map.Entry<EndPoint,Double> entry : storageLoadBalancer_.getLoadInfo().entrySet())
-                {
-                    if (maxEndpoint == null || entry.getValue() > maxLoad)
-                    {
-                        maxEndpoint = entry.getKey();
-                        maxLoad = entry.getValue();
-                    }
-                }
-                if (!maxEndpoint.equals(getLocalStorageEndPoint()))
-                {
-                    Token<?> t = getBootstrapTokenFrom(maxEndpoint);
-                    logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint.getHost());
-                    updateToken(t);
-                }
-            }
-
-            BootStrapper bs = new BootStrapper(new EndPoint[] {getLocalStorageEndPoint()}, storageMetadata_.getToken());
-            bootStrapper_.submit(bs);
-            Gossiper.instance().addApplicationState(BOOTSTRAP_MODE, new ApplicationState(""));
+            BootStrapper.startBootstrap();
         }
 
-        storageLoadBalancer_.start();
         Gossiper.instance().register(this);
         Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
         /* Make sure this token gets gossiped around. */
@@ -380,14 +279,6 @@
         Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
     }
 
-    private Token<?> getBootstrapTokenFrom(EndPoint maxEndpoint)
-    {
-        Message message = new Message(getLocalStorageEndPoint(), "", bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
-        BootstrapTokenCallback btc = new BootstrapTokenCallback();
-        MessagingService.instance().sendRR(message, maxEndpoint, btc);
-        return btc.getToken();
-    }
-
     public boolean isBootstrapMode()
     {
         return isBootstrapMode;
@@ -587,7 +478,7 @@
     public Map<String, String> getLoadMap()
     {
         Map<String, String> map = new HashMap<String, String>();
-        for (Map.Entry<EndPoint,Double> entry : storageLoadBalancer_.getLoadInfo().entrySet())
+        for (Map.Entry<EndPoint,Double> entry : StorageLoadBalancer.instance().getLoadInfo().entrySet())
         {
             map.put(entry.getKey().getHost(), FileUtils.stringifyFileSize(entry.getValue()));
         }
@@ -665,6 +556,11 @@
         HintedHandOffManager.instance().deliverHints(endpoint);
     }
 
+    public Token getLocalToken()
+    {
+        return tokenMetadata_.getToken(tcpAddr_);
+    }
+
     /* This methods belong to the MBean interface */
     
     public String getToken(EndPoint ep)
@@ -680,7 +576,7 @@
 
     public String getToken()
     {
-        return tokenMetadata_.getToken(StorageService.tcpAddr_).toString();
+        return getLocalToken().toString();
     }
 
     public Set<String> getLiveNodes()
@@ -1116,41 +1012,4 @@
         tokens.add(range.right().toString());
         return tokens;
     }
-
-    class BootstrapTokenCallback implements IAsyncCallback
-    {
-        private volatile Token<?> token;
-        private final Condition condition = new SimpleCondition();
-
-        public Token<?> getToken()
-        {
-            try
-            {
-                condition.await();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
-            return token;
-        }
-
-        public void response(Message msg)
-        {
-            try
-            {
-                token = partitioner_.getTokenFactory().fromString(new String(msg.getMessageBody(), "UTF-8"));
-            }
-            catch (UnsupportedEncodingException e)
-            {
-                throw new AssertionError();
-            }
-            condition.signalAll();
-        }
-
-        public void attachContext(Object o)
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Fri Oct  9 17:07:14 2009
@@ -39,45 +39,7 @@
 public final class StreamManager
 {   
     private static Logger logger_ = Logger.getLogger( StreamManager.class );
-    
-    public static class BootstrapTerminateVerbHandler implements IVerbHandler
-    {
-        private static Logger logger_ = Logger.getLogger( BootstrapTerminateVerbHandler.class );
-
-        public void doVerb(Message message)
-        {
-            byte[] body = message.getMessageBody();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-
-            try
-            {
-                StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
-                StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
-                                               
-                switch( streamStatus.getAction() )
-                {
-                    case DELETE:                              
-                        StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
-                        break;
-
-                    case STREAM:
-                        if (logger_.isDebugEnabled())
-                          logger_.debug("Need to re-stream file " + streamStatus.getFile());
-                        StreamManager.instance(message.getFrom()).repeat();
-                        break;
-
-                    default:
-                        break;
-                }
-            }
-            catch ( IOException ex )
-            {
-                logger_.info(LogUtil.throwableToString(ex));
-            }
-        }
-    }
-    
+        
     private static Map<EndPoint, StreamManager> streamManagers_ = new HashMap<EndPoint, StreamManager>();
     
     public static StreamManager instance(EndPoint to)
@@ -111,7 +73,7 @@
         }
     }
     
-    void start()
+    public void start()
     {
         if ( filesToStream_.size() > 0 )
         {
@@ -122,13 +84,13 @@
         }
     }
     
-    void repeat()
+    public void repeat()
     {
         if ( filesToStream_.size() > 0 )
             start();
     }
     
-    void finish(String file) throws IOException
+    public void finish(String file) throws IOException
     {
         File f = new File(file);
         if (logger_.isDebugEnabled())

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Fri Oct  9 17:07:14 2009
@@ -31,6 +31,10 @@
 
 import static junit.framework.Assert.assertEquals;
 import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.CollatingOrderPreservingPartitioner;
 import org.apache.cassandra.db.filter.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
@@ -95,4 +99,37 @@
 
         TableTest.reTest(store, r);
     }
+
+    /**
+     * Writes out a bunch of keys into an SSTable, then runs anticompaction on a range.
+     * Checks to see if anticompaction returns true.
+     */
+    private void testAntiCompaction(String columnFamilyName, int insertsPerTable) throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName);
+
+        for (int j = 0; j < insertsPerTable; j++)
+        {
+            String key = String.valueOf(j);
+            RowMutation rm = new RowMutation("Keyspace1", key);
+            rm.add(new QueryPath(columnFamilyName, null, "0".getBytes()), new byte[0], j);
+            rm.apply();
+        }
+
+        store.forceBlockingFlush();
+        List<Range> ranges  = new ArrayList<Range>();
+        IPartitioner partitioner = new CollatingOrderPreservingPartitioner();
+        Range r = new Range(partitioner.getToken("0"), partitioner.getToken("zzzzzzz"));
+        ranges.add(r);
+
+        List<SSTableReader> fileList = store.forceAntiCompaction(ranges, new EndPoint("127.0.0.1", 9150));
+        assert fileList.size() >= 1;
+    }
+
+    @Test
+    public void testAntiCompaction1() throws IOException, ExecutionException, InterruptedException
+    {
+        testAntiCompaction("Standard1", 100);
+    }    
 }

Copied: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (from r823562, incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?p2=incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java&p1=incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java&r1=823562&r2=823616&rev=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java Fri Oct  9 17:07:14 2009
@@ -16,7 +16,7 @@
 * specific language governing permissions and limitations
 * under the License.
 */
-package org.apache.cassandra.db;
+package org.apache.cassandra.dht;
 
 import static junit.framework.Assert.assertEquals;
 import static org.junit.Assert.*;
@@ -38,39 +38,6 @@
 
 public class BootstrapTest
 {
-    /**
-     * Writes out a bunch of keys into an SSTable, then runs anticompaction on a range.
-     * Checks to see if anticompaction returns true.
-     */
-    private void testAntiCompaction(String columnFamilyName, int insertsPerTable) throws IOException, ExecutionException, InterruptedException
-    {
-        Table table = Table.open("Keyspace1");
-        ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName);
-       
-        for (int j = 0; j < insertsPerTable; j++) 
-        {
-            String key = String.valueOf(j);
-            RowMutation rm = new RowMutation("Keyspace1", key);
-            rm.add(new QueryPath(columnFamilyName, null, "0".getBytes()), new byte[0], j);
-            rm.apply();
-        }
-        
-        store.forceBlockingFlush();
-        List<Range> ranges  = new ArrayList<Range>();
-        IPartitioner partitioner = new CollatingOrderPreservingPartitioner();
-        Range r = new Range(partitioner.getToken("0"), partitioner.getToken("zzzzzzz"));
-        ranges.add(r);
-
-        List<SSTableReader> fileList = store.forceAntiCompaction(ranges, new EndPoint("127.0.0.1", 9150));
-        assert fileList.size() >= 1;
-    }
-
-    @Test
-    public void testAntiCompaction1() throws IOException, ExecutionException, InterruptedException
-    {
-        testAntiCompaction("Standard1", 100);
-    }
-    
     @Test
     public void testGetNewNames() throws IOException
     {
@@ -78,7 +45,7 @@
         streamContexts[0] = new StreamContextManager.StreamContext("/foo/Standard1-500-Data.db", 100, "Keyspace1");
         streamContexts[1] = new StreamContextManager.StreamContext("/foo/Standard1-500-Index.db", 100, "Keyspace1");
         streamContexts[2] = new StreamContextManager.StreamContext("/foo/Standard1-500-Filter.db", 100, "Keyspace1");
-        Table.BootStrapInitiateVerbHandler bivh = new Table.BootStrapInitiateVerbHandler();
+        BootStrapper.BootStrapInitiateVerbHandler bivh = new BootStrapper.BootStrapInitiateVerbHandler();
         Map<String, String> fileNames = bivh.getNewNames(streamContexts);
         String result = fileNames.get("Keyspace1-Standard1-500");
         assertEquals(true, result.contains("Standard1"));
@@ -89,6 +56,4 @@
         assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
         assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
     }
-
-    
 }