You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/27 15:40:29 UTC
svn commit: r830214 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/dht/
Author: jbellis
Date: Tue Oct 27 14:40:29 2009
New Revision: 830214
URL: http://svn.apache.org/viewvc?rev=830214&view=rev
Log:
refactor bootstrap to only concern itself with bootstrapping the local node, which greatly simplifies things
patch by jbellis; reviewed by goffinet for CASSANDRA-483
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=830214&r1=830213&r2=830214&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Oct 27 14:40:29 2009
@@ -34,9 +34,9 @@
import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
- import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.net.io.IStreamComplete;
@@ -47,8 +47,8 @@
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.gms.Gossiper;
- import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.gms.FailureDetector;
+ import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.SSTableWriter;
@@ -69,92 +69,52 @@
{
public static final long INITIAL_DELAY = 30 * 1000; //ms
- static final Logger logger = Logger.getLogger(BootStrapper.class);
+ private static final Logger logger = Logger.getLogger(BootStrapper.class);
/* endpoints that need to be bootstrapped */
- protected final List<InetAddress> targets;
+ protected final InetAddress address;
/* tokens of the nodes being bootstrapped. */
- protected final Token[] tokens;
+ protected final Token token;
protected final TokenMetadata tokenMetadata;
+ private final AbstractReplicationStrategy replicationStrategy;
- public BootStrapper(List<InetAddress> targets, Token... token)
+ public BootStrapper(AbstractReplicationStrategy rs, InetAddress address, Token token, TokenMetadata tmd)
{
- this.targets = targets;
- tokens = token;
- tokenMetadata = StorageService.instance().getTokenMetadata();
+ assert address != null;
+ assert token != null;
+
+ replicationStrategy = rs;
+ this.address = address;
+ this.token = token;
+ tokenMetadata = tmd;
}
- Map<Range, List<BootstrapSourceTarget>> getRangesWithSourceTarget()
- {
- /* copy the token to endpoint map */
- Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata.cloneTokenEndPointMap();
- /* remove the tokens associated with the endpoints being bootstrapped */
- for (Token token : tokens)
- {
- tokenToEndPointMap.remove(token);
- }
-
- Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
- Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
- if (logger.isDebugEnabled())
- logger.debug("Total number of old ranges " + oldRanges.length);
- /*
- * Find the ranges that are split. Maintain a mapping between
- * the range being split and the list of subranges.
- */
- Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens);
- /* Calculate the list of nodes that handle the old ranges */
- Map<Range, List<InetAddress>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
- /* Mapping of split ranges to the list of endpoints responsible for the range */
- Map<Range, List<InetAddress>> replicasForSplitRanges = new HashMap<Range, List<InetAddress>>();
- Set<Range> rangesSplit = splitRanges.keySet();
- for ( Range splitRange : rangesSplit )
- {
- replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
- }
- /* Remove the ranges that are split. */
- for ( Range splitRange : rangesSplit )
- {
- oldRangeToEndPointMap.remove(splitRange);
- }
-
- /* Add the subranges of the split range to the map with the same replica set. */
- for ( Range splitRange : rangesSplit )
- {
- List<Range> subRanges = splitRanges.get(splitRange);
- List<InetAddress> replicas = replicasForSplitRanges.get(splitRange);
- for ( Range subRange : subRanges )
- {
- /* Make sure we clone or else we are hammered. */
- oldRangeToEndPointMap.put(subRange, new ArrayList<InetAddress>(replicas));
- }
- }
-
- /* Add the new token and re-calculate the range assignments */
- Collections.addAll( oldTokens, tokens);
- Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
-
- if (logger.isDebugEnabled())
- logger.debug("Total number of new ranges " + newRanges.length);
- /* Calculate the list of nodes that handle the new ranges */
- Map<Range, List<InetAddress>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
- /* Calculate ranges that need to be sent and from whom to where */
- Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
- return rangesWithSourceTarget;
- }
-
- private static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
+ public void startBootstrap() throws IOException
{
- Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
- BootstrapTokenCallback btc = new BootstrapTokenCallback();
- MessagingService.instance().sendRR(message, maxEndpoint, btc);
- return btc.getToken();
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ Map<Range, Set<InetAddress>> rangesWithSourceTarget = getRangesWithSources();
+ if (logger.isDebugEnabled())
+ logger.debug("Beginning bootstrap process for " + address + " ...");
+ /* Send messages to respective folks to stream data over to me */
+ for (Map.Entry<InetAddress, List<Range>> entry : getWorkMap(rangesWithSourceTarget).entrySet())
+ {
+ InetAddress source = entry.getKey();
+ BootstrapMetadata bsMetadata = new BootstrapMetadata(address, entry.getValue());
+ Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(new BootstrapMetadataMessage(bsMetadata));
+ if (logger.isDebugEnabled())
+ logger.debug("Sending the BootstrapMetadataMessage to " + source);
+ MessagingService.instance().sendOneWay(message, source);
+ StorageService.instance().addBootstrapSource(source);
+ }
+ }
+ }).start();
}
- public void startBootstrap() throws IOException
+ public static void guessTokenIfNotSpecified() throws IOException
{
- logger.info("Starting in bootstrap mode (first, sleeping to get load information)");
-
StorageService ss = StorageService.instance();
StorageLoadBalancer slb = StorageLoadBalancer.instance();
@@ -181,36 +141,76 @@
if (!maxEndpoint.equals(FBUtilities.getLocalAddress()))
{
Token<?> t = getBootstrapTokenFrom(maxEndpoint);
- logger.info("Setting token to " + t + " to assume load from " + maxEndpoint);
+ logger.info("New token will be " + t + " to assume load from " + maxEndpoint);
ss.setToken(t);
}
}
+ }
- new Thread(new Runnable()
+ Map<Range, Set<InetAddress>> getRangesWithSources()
+ {
+ Map<Token, InetAddress> map = tokenMetadata.cloneTokenEndPointMap();
+ assert map.size() > 0;
+ map.put(token, address);
+ Set<Range> myRanges = replicationStrategy.getAddressRanges(map).get(address);
+ map.remove(token);
+
+ Map<Range, Set<InetAddress>> myRangeAddresses = new HashMap<Range, Set<InetAddress>>();
+ Map<Range, Set<InetAddress>> rangeAddresses = replicationStrategy.getRangeAddresses(map);
+ for (Range range : rangeAddresses.keySet())
{
- public void run()
+ for (Range myRange : myRanges)
{
- // Mark as not bootstrapping to calculate ranges correctly
- for (int i=0; i< targets.size(); i++)
+ if (range.contains(myRange.right()))
{
- tokenMetadata.setBootstrapping(targets.get(i), false);
+ assert !myRangeAddresses.containsKey(myRange);
+ myRangeAddresses.put(myRange, rangeAddresses.get(range));
}
+ }
+ }
+ return myRangeAddresses;
+ }
- Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = getRangesWithSourceTarget();
- if (logger.isDebugEnabled())
- logger.debug("Beginning bootstrap process for [" + StringUtils.join(targets, ", ") + "] ...");
- /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
- try
- {
- LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
- }
- catch (IOException e)
+ private static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
+ {
+ Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
+ BootstrapTokenCallback btc = new BootstrapTokenCallback();
+ MessagingService.instance().sendRR(message, maxEndpoint, btc);
+ return btc.getToken();
+ }
+
+ static Map<InetAddress, List<Range>> getWorkMap(Map<Range, Set<InetAddress>> rangesWithSourceTarget)
+ {
+ return getWorkMap(rangesWithSourceTarget, FailureDetector.instance());
+ }
+
+ static Map<InetAddress, List<Range>> getWorkMap(Map<Range, Set<InetAddress>> rangesWithSourceTarget, IFailureDetector failureDetector)
+ {
+ /*
+ * Map whose key is the source node and the value is a map whose key is the
+ * target and value is the list of ranges to be sent to it.
+ */
+ Map<InetAddress, List<Range>> sources = new HashMap<InetAddress, List<Range>>();
+
+ // TODO look for contiguous ranges and map them to the same source
+ for (Range range : rangesWithSourceTarget.keySet())
+ {
+ for (InetAddress source : rangesWithSourceTarget.get(range))
+ {
+ if (failureDetector.isAlive(source))
{
- throw new RuntimeException(e);
+ List<Range> ranges = sources.get(source);
+ if (ranges == null)
+ {
+ ranges = new ArrayList<Range>();
+ sources.put(source, ranges);
+ }
+ ranges.add(range);
+ break;
}
}
- }).start();
- Gossiper.instance().addApplicationState(StorageService.MODE, new ApplicationState(StorageService.MODE_MOVING));
+ }
+ return sources;
}
public static class BootstrapTokenVerbHandler implements IVerbHandler
@@ -403,11 +403,9 @@
String [] temp = fileName.split("-");
//Open the file to see if all parts are now here
- SSTableReader sstable = null;
try
{
- sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
-
+ SSTableReader sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
//TODO add a sanity check that this sstable has all its parts and is ok
Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
logger.info("Bootstrap added " + sstable.getFilename());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=830214&r1=830213&r2=830214&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Tue Oct 27 14:40:29 2009
@@ -18,10 +18,7 @@
package org.apache.cassandra.dht;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
@@ -48,17 +45,25 @@
return serializer_;
}
- protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage) throws IOException
+ protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage)
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
+ try
+ {
+ BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
}
protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
-
- BootstrapMetadataMessage(BootstrapMetadata[] bsMetadata)
+
+ // TODO only actually ever need one BM, not an array
+ BootstrapMetadataMessage(BootstrapMetadata... bsMetadata)
{
assert bsMetadata != null;
bsMetadata_ = bsMetadata;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=830214&r1=830213&r2=830214&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Oct 27 14:40:29 2009
@@ -263,7 +263,8 @@
{
logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
Gossiper.instance().addApplicationState(MODE, new ApplicationState(MODE_MOVING));
- new BootStrapper(Arrays.asList(FBUtilities.getLocalAddress()), getLocalToken()).startBootstrap(); // handles token update
+ BootStrapper.guessTokenIfNotSpecified();
+ new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), getLocalToken(), tokenMetadata_).startBootstrap(); // handles token update
}
else
{
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=830214&r1=830213&r2=830214&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Tue Oct 27 14:40:29 2009
@@ -18,66 +18,82 @@
*/
package org.apache.cassandra.dht;
-import static org.junit.Assert.*;
-
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
-import java.util.Arrays;
+import java.util.Set;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import org.apache.commons.lang.StringUtils;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
-import org.junit.Test;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
public class BootStrapperTest {
@Test
public void testSourceTargetComputation() throws UnknownHostException
{
- int numOldNodes = 3;
- IPartitioner p = generateOldTokens(numOldNodes);
+ testSourceTargetComputation(1);
+ testSourceTargetComputation(3);
+ testSourceTargetComputation(100);
+ }
+
+ private void testSourceTargetComputation(int numOldNodes) throws UnknownHostException
+ {
+ generateFakeEndpoints(numOldNodes);
- Token newToken = p.getDefaultToken();
- InetAddress newEndPoint = InetAddress.getByName("1.2.3.10");
+ Token myToken = StorageService.getPartitioner().getDefaultToken();
+ InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
- /* New token needs to be part of the map for the algorithm
- * to calculate the ranges correctly
- */
- StorageService.instance().updateTokenMetadataUnsafe(newToken, newEndPoint);
+ StorageService ss = StorageService.instance();
+ ss.updateTokenMetadataUnsafe(myToken, myEndpoint);
- BootStrapper b = new BootStrapper(Arrays.asList(newEndPoint), newToken );
- Map<Range,List<BootstrapSourceTarget>> res = b.getRangesWithSourceTarget();
+ TokenMetadata tmd = ss.getTokenMetadata();
+ assertEquals(numOldNodes + 1, tmd.cloneTokenEndPointMap().size());
+ BootStrapper b = new BootStrapper(ss.getReplicationStrategy(), myEndpoint, myToken, tmd);
+ Map<Range, Set<InetAddress>> res = b.getRangesWithSources();
int transferCount = 0;
- for ( Map.Entry<Range, List<BootstrapSourceTarget>> e: res.entrySet())
+ for (Map.Entry<Range, Set<InetAddress>> e : res.entrySet())
{
- if (e.getValue() != null && e.getValue().size() >0)
- {
- transferCount++;
- }
+ assert e.getValue() != null && e.getValue().size() > 0 : StringUtils.join(e.getValue(), ", ");
+ transferCount++;
}
+
/* Only 1 transfer from old node to new node */
assertEquals(1, transferCount);
- Map<InetAddress, Map<InetAddress,List<Range>>> temp = LeaveJoinProtocolHelper.getWorkMap(res);
- assertEquals(1, temp.keySet().size());
- assertEquals(1, temp.entrySet().size());
+ IFailureDetector mockFailureDetector = new IFailureDetector()
+ {
+ public boolean isAlive(InetAddress ep)
+ {
+ return true;
+ }
- Map<InetAddress,Map<InetAddress,List<Range>>> res2 = LeaveJoinProtocolHelper.filterRangesForTargetEndPoint(temp, newEndPoint);
- /* After filtering, still only 1 transfer */
- assertEquals(1, res2.keySet().size());
- assertEquals(1, res2.entrySet().size());
- assertTrue(((Map<InetAddress,List<Range>>)res2.values().toArray()[0]).containsKey(newEndPoint));
+ public void interpret(InetAddress ep) { throw new UnsupportedOperationException(); }
+ public void report(InetAddress ep) { throw new UnsupportedOperationException(); }
+ public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
+ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
+ };
+ Map<InetAddress, List<Range>> temp = BootStrapper.getWorkMap(res, mockFailureDetector);
+ assertEquals(1, temp.keySet().size());
+ assertEquals(1, temp.values().iterator().next().size());
+ assert !temp.keySet().iterator().next().equals(myEndpoint);
}
- private IPartitioner generateOldTokens(int numOldNodes) throws UnknownHostException
+ private void generateFakeEndpoints(int numOldNodes) throws UnknownHostException
{
- IPartitioner p = new RandomPartitioner();
- for (int i = 0 ; i< numOldNodes; i++)
+ TokenMetadata tmd = StorageService.instance().getTokenMetadata();
+ tmd.clearUnsafe();
+ IPartitioner<?> p = StorageService.getPartitioner();
+
+ for (int i = 1; i <= numOldNodes; i++)
{
- InetAddress e = InetAddress.getByName("127.0.0." + i);
- Token t = p.getDefaultToken();
- StorageService.instance().updateTokenMetadataUnsafe(t, e);
+ // leave .1 for myEndpoint
+ tmd.update(p.getDefaultToken(), InetAddress.getByName("127.0.0." + (i + 1)));
}
- return p;
}
}
\ No newline at end of file