You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/27 15:40:29 UTC

svn commit: r830214 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/dht/

Author: jbellis
Date: Tue Oct 27 14:40:29 2009
New Revision: 830214

URL: http://svn.apache.org/viewvc?rev=830214&view=rev
Log:
refactor bootstrap to only concern itself with bootstrapping the local node, which greatly simplifies things
patch by jbellis; reviewed by goffinet for CASSANDRA-483

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=830214&r1=830213&r2=830214&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Oct 27 14:40:29 2009
@@ -34,9 +34,9 @@
  import org.apache.log4j.Logger;
 
  import org.apache.commons.lang.ArrayUtils;
- import org.apache.commons.lang.StringUtils;
 
  import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.net.*;
  import org.apache.cassandra.net.io.StreamContextManager;
  import org.apache.cassandra.net.io.IStreamComplete;
@@ -47,8 +47,8 @@
  import org.apache.cassandra.utils.SimpleCondition;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.gms.Gossiper;
- import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.gms.FailureDetector;
+ import org.apache.cassandra.gms.IFailureDetector;
  import org.apache.cassandra.io.DataInputBuffer;
  import org.apache.cassandra.io.SSTableReader;
  import org.apache.cassandra.io.SSTableWriter;
@@ -69,92 +69,52 @@
 {
     public static final long INITIAL_DELAY = 30 * 1000; //ms
 
-    static final Logger logger = Logger.getLogger(BootStrapper.class);
+    private static final Logger logger = Logger.getLogger(BootStrapper.class);
 
     /* endpoints that need to be bootstrapped */
-    protected final List<InetAddress> targets;
+    protected final InetAddress address;
     /* tokens of the nodes being bootstrapped. */
-    protected final Token[] tokens;
+    protected final Token token;
     protected final TokenMetadata tokenMetadata;
+    private final AbstractReplicationStrategy replicationStrategy;
 
-    public BootStrapper(List<InetAddress> targets, Token... token)
+    public BootStrapper(AbstractReplicationStrategy rs, InetAddress address, Token token, TokenMetadata tmd)
     {
-        this.targets = targets;
-        tokens = token;
-        tokenMetadata = StorageService.instance().getTokenMetadata();
+        assert address != null;
+        assert token != null;
+
+        replicationStrategy = rs;
+        this.address = address;
+        this.token = token;
+        tokenMetadata = tmd;
     }
     
-    Map<Range, List<BootstrapSourceTarget>> getRangesWithSourceTarget()
-    {
-        /* copy the token to endpoint map */
-        Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata.cloneTokenEndPointMap();
-        /* remove the tokens associated with the endpoints being bootstrapped */                
-        for (Token token : tokens)
-        {
-            tokenToEndPointMap.remove(token);                    
-        }
-
-        Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
-        Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
-        if (logger.isDebugEnabled())
-          logger.debug("Total number of old ranges " + oldRanges.length);
-        /* 
-         * Find the ranges that are split. Maintain a mapping between
-         * the range being split and the list of subranges.
-        */                
-        Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens);
-        /* Calculate the list of nodes that handle the old ranges */
-        Map<Range, List<InetAddress>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
-        /* Mapping of split ranges to the list of endpoints responsible for the range */                
-        Map<Range, List<InetAddress>> replicasForSplitRanges = new HashMap<Range, List<InetAddress>>();
-        Set<Range> rangesSplit = splitRanges.keySet();                
-        for ( Range splitRange : rangesSplit )
-        {
-            replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
-        }                
-        /* Remove the ranges that are split. */
-        for ( Range splitRange : rangesSplit )
-        {
-            oldRangeToEndPointMap.remove(splitRange);
-        }
-        
-        /* Add the subranges of the split range to the map with the same replica set. */
-        for ( Range splitRange : rangesSplit )
-        {
-            List<Range> subRanges = splitRanges.get(splitRange);
-            List<InetAddress> replicas = replicasForSplitRanges.get(splitRange);
-            for ( Range subRange : subRanges )
-            {
-                /* Make sure we clone or else we are hammered. */
-                oldRangeToEndPointMap.put(subRange, new ArrayList<InetAddress>(replicas));
-            }
-        }                
-        
-        /* Add the new token and re-calculate the range assignments */
-        Collections.addAll( oldTokens, tokens);
-        Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
-
-        if (logger.isDebugEnabled())
-          logger.debug("Total number of new ranges " + newRanges.length);
-        /* Calculate the list of nodes that handle the new ranges */
-        Map<Range, List<InetAddress>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
-        /* Calculate ranges that need to be sent and from whom to where */
-        Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
-        return rangesWithSourceTarget;
-    }
-
-    private static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
+    public void startBootstrap() throws IOException
     {
-        Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
-        BootstrapTokenCallback btc = new BootstrapTokenCallback();
-        MessagingService.instance().sendRR(message, maxEndpoint, btc);
-        return btc.getToken();
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                Map<Range, Set<InetAddress>> rangesWithSourceTarget = getRangesWithSources();
+                if (logger.isDebugEnabled())
+                        logger.debug("Beginning bootstrap process for " + address + " ...");
+                /* Send messages to respective folks to stream data over to me */
+                for (Map.Entry<InetAddress, List<Range>> entry : getWorkMap(rangesWithSourceTarget).entrySet())
+                {
+                    InetAddress source = entry.getKey();
+                    BootstrapMetadata bsMetadata = new BootstrapMetadata(address, entry.getValue());
+                    Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(new BootstrapMetadataMessage(bsMetadata));
+                    if (logger.isDebugEnabled())
+                        logger.debug("Sending the BootstrapMetadataMessage to " + source);
+                    MessagingService.instance().sendOneWay(message, source);
+                    StorageService.instance().addBootstrapSource(source);
+                }
+            }
+        }).start();
     }
 
-    public void startBootstrap() throws IOException
+    public static void guessTokenIfNotSpecified() throws IOException
     {
-        logger.info("Starting in bootstrap mode (first, sleeping to get load information)");
-
         StorageService ss = StorageService.instance();
         StorageLoadBalancer slb = StorageLoadBalancer.instance();
 
@@ -181,36 +141,76 @@
             if (!maxEndpoint.equals(FBUtilities.getLocalAddress()))
             {
                 Token<?> t = getBootstrapTokenFrom(maxEndpoint);
-                logger.info("Setting token to " + t + " to assume load from " + maxEndpoint);
+                logger.info("New token will be " + t + " to assume load from " + maxEndpoint);
                 ss.setToken(t);
             }
         }
+    }
 
-        new Thread(new Runnable()
+    Map<Range, Set<InetAddress>> getRangesWithSources()
+    {
+        Map<Token, InetAddress> map = tokenMetadata.cloneTokenEndPointMap();
+        assert map.size() > 0;
+        map.put(token, address);
+        Set<Range> myRanges = replicationStrategy.getAddressRanges(map).get(address);
+        map.remove(token);
+
+        Map<Range, Set<InetAddress>> myRangeAddresses = new HashMap<Range, Set<InetAddress>>();
+        Map<Range, Set<InetAddress>> rangeAddresses = replicationStrategy.getRangeAddresses(map);
+        for (Range range : rangeAddresses.keySet())
         {
-            public void run()
+            for (Range myRange : myRanges)
             {
-                // Mark as not bootstrapping to calculate ranges correctly
-                for (int i=0; i< targets.size(); i++)
+                if (range.contains(myRange.right()))
                 {
-                    tokenMetadata.setBootstrapping(targets.get(i), false);
+                    assert !myRangeAddresses.containsKey(myRange);
+                    myRangeAddresses.put(myRange, rangeAddresses.get(range));
                 }
+            }
+        }
+        return myRangeAddresses;
+    }
 
-                Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = getRangesWithSourceTarget();
-                if (logger.isDebugEnabled())
-                        logger.debug("Beginning bootstrap process for [" + StringUtils.join(targets, ", ") + "] ...");
-                /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
-                try
-                {
-                    LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
-                }
-                catch (IOException e)
+    private static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
+    {
+        Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
+        BootstrapTokenCallback btc = new BootstrapTokenCallback();
+        MessagingService.instance().sendRR(message, maxEndpoint, btc);
+        return btc.getToken();
+    }
+
+    static Map<InetAddress, List<Range>> getWorkMap(Map<Range, Set<InetAddress>> rangesWithSourceTarget)
+    {
+        return getWorkMap(rangesWithSourceTarget, FailureDetector.instance());
+    }
+
+    static Map<InetAddress, List<Range>> getWorkMap(Map<Range, Set<InetAddress>> rangesWithSourceTarget, IFailureDetector failureDetector)
+    {
+        /*
+         * Map whose key is the source node and the value is a map whose key is the
+         * target and value is the list of ranges to be sent to it.
+        */
+        Map<InetAddress, List<Range>> sources = new HashMap<InetAddress, List<Range>>();
+
+        // TODO look for contiguous ranges and map them to the same source
+        for (Range range : rangesWithSourceTarget.keySet())
+        {
+            for (InetAddress source : rangesWithSourceTarget.get(range))
+            {
+                if (failureDetector.isAlive(source))
                 {
-                    throw new RuntimeException(e);
+                    List<Range> ranges = sources.get(source);
+                    if (ranges == null)
+                    {
+                        ranges = new ArrayList<Range>();
+                        sources.put(source, ranges);
+                    }
+                    ranges.add(range);
+                    break;
                 }
             }
-        }).start();
-        Gossiper.instance().addApplicationState(StorageService.MODE, new ApplicationState(StorageService.MODE_MOVING));
+        }
+        return sources;
     }
 
     public static class BootstrapTokenVerbHandler implements IVerbHandler
@@ -403,11 +403,9 @@
                 String [] temp = fileName.split("-");
 
                 //Open the file to see if all parts are now here
-                SSTableReader sstable = null;
                 try
                 {
-                    sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
-
+                    SSTableReader sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
                     //TODO add a sanity check that this sstable has all its parts and is ok
                     Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
                     logger.info("Bootstrap added " + sstable.getFilename());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=830214&r1=830213&r2=830214&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Tue Oct 27 14:40:29 2009
@@ -18,10 +18,7 @@
 
 package org.apache.cassandra.dht;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
@@ -48,17 +45,25 @@
         return serializer_;
     }
     
-    protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage) throws IOException
+    protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage)
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
+        try
+        {
+            BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
         return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
     }        
     
     protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
-    
-    BootstrapMetadataMessage(BootstrapMetadata[] bsMetadata)
+
+    // TODO only actually ever need one BM, not an array
+    BootstrapMetadataMessage(BootstrapMetadata... bsMetadata)
     {
         assert bsMetadata != null;
         bsMetadata_ = bsMetadata;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=830214&r1=830213&r2=830214&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Oct 27 14:40:29 2009
@@ -263,7 +263,8 @@
         {
             logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
             Gossiper.instance().addApplicationState(MODE, new ApplicationState(MODE_MOVING));
-            new BootStrapper(Arrays.asList(FBUtilities.getLocalAddress()), getLocalToken()).startBootstrap(); // handles token update
+            BootStrapper.guessTokenIfNotSpecified();
+            new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), getLocalToken(), tokenMetadata_).startBootstrap(); // handles token update
         }
         else
         {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=830214&r1=830213&r2=830214&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Tue Oct 27 14:40:29 2009
@@ -18,66 +18,82 @@
 */
 package org.apache.cassandra.dht;
 
-import static org.junit.Assert.*;
-
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
-import java.util.Arrays;
+import java.util.Set;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import org.apache.commons.lang.StringUtils;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
 
+import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.junit.Test;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
 
 public class BootStrapperTest {
     @Test
     public void testSourceTargetComputation() throws UnknownHostException
     {
-        int numOldNodes = 3;
-        IPartitioner p = generateOldTokens(numOldNodes);
+        testSourceTargetComputation(1);
+        testSourceTargetComputation(3);
+        testSourceTargetComputation(100);
+    }
+
+    private void testSourceTargetComputation(int numOldNodes) throws UnknownHostException
+    {
+        generateFakeEndpoints(numOldNodes);
         
-        Token newToken = p.getDefaultToken();
-        InetAddress newEndPoint = InetAddress.getByName("1.2.3.10");
+        Token myToken = StorageService.getPartitioner().getDefaultToken();
+        InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
  
-        /* New token needs to be part of the map for the algorithm
-         * to calculate the ranges correctly
-         */
-        StorageService.instance().updateTokenMetadataUnsafe(newToken, newEndPoint);
+        StorageService ss = StorageService.instance();
+        ss.updateTokenMetadataUnsafe(myToken, myEndpoint);
 
-        BootStrapper b = new BootStrapper(Arrays.asList(newEndPoint), newToken );
-        Map<Range,List<BootstrapSourceTarget>> res = b.getRangesWithSourceTarget();
+        TokenMetadata tmd = ss.getTokenMetadata();
+        assertEquals(numOldNodes + 1, tmd.cloneTokenEndPointMap().size());
+        BootStrapper b = new BootStrapper(ss.getReplicationStrategy(), myEndpoint, myToken, tmd);
+        Map<Range, Set<InetAddress>> res = b.getRangesWithSources();
         
         int transferCount = 0;
-        for ( Map.Entry<Range, List<BootstrapSourceTarget>> e: res.entrySet())
+        for (Map.Entry<Range, Set<InetAddress>> e : res.entrySet())
         {
-            if (e.getValue() != null && e.getValue().size() >0)
-            {
-                transferCount++;
-            }
+            assert e.getValue() != null && e.getValue().size() > 0 : StringUtils.join(e.getValue(), ", ");
+            transferCount++;
         }
+
         /* Only 1 transfer from old node to new node */
         assertEquals(1, transferCount);
-        Map<InetAddress, Map<InetAddress,List<Range>>> temp = LeaveJoinProtocolHelper.getWorkMap(res);
-        assertEquals(1, temp.keySet().size());
-        assertEquals(1, temp.entrySet().size());
+        IFailureDetector mockFailureDetector = new IFailureDetector()
+        {
+            public boolean isAlive(InetAddress ep)
+            {
+                return true;
+            }
 
-        Map<InetAddress,Map<InetAddress,List<Range>>> res2 = LeaveJoinProtocolHelper.filterRangesForTargetEndPoint(temp, newEndPoint);
-        /* After filtering, still only 1 transfer */
-        assertEquals(1, res2.keySet().size());
-        assertEquals(1, res2.entrySet().size());
-        assertTrue(((Map<InetAddress,List<Range>>)res2.values().toArray()[0]).containsKey(newEndPoint));
+            public void interpret(InetAddress ep) { throw new UnsupportedOperationException(); }
+            public void report(InetAddress ep) { throw new UnsupportedOperationException(); }
+            public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
+            public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
+        };
+        Map<InetAddress, List<Range>> temp = BootStrapper.getWorkMap(res, mockFailureDetector);
+        assertEquals(1, temp.keySet().size());
+        assertEquals(1, temp.values().iterator().next().size());
+        assert !temp.keySet().iterator().next().equals(myEndpoint);
     }
 
-    private IPartitioner generateOldTokens(int numOldNodes) throws UnknownHostException
+    private void generateFakeEndpoints(int numOldNodes) throws UnknownHostException
     {
-        IPartitioner p = new RandomPartitioner();
-        for (int i = 0 ; i< numOldNodes; i++)
+        TokenMetadata tmd = StorageService.instance().getTokenMetadata();
+        tmd.clearUnsafe();
+        IPartitioner<?> p = StorageService.getPartitioner();
+
+        for (int i = 1; i <= numOldNodes; i++)
         {
-            InetAddress e = InetAddress.getByName("127.0.0." + i);
-            Token t = p.getDefaultToken();
-            StorageService.instance().updateTokenMetadataUnsafe(t, e);
+            // leave .1 for myEndpoint
+            tmd.update(p.getDefaultToken(), InetAddress.getByName("127.0.0." + (i + 1)));
         }
-        return p;
     }
 }
\ No newline at end of file