You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:21:26 UTC
svn commit: r759022 - in
/incubator/cassandra/trunk/src/org/apache/cassandra/dht: BootStrapper.java
LeaveJoinProtocolHelper.java LeaveJoinProtocolImpl.java Range.java
Author: alakshman
Date: Fri Mar 27 05:21:26 2009
New Revision: 759022
URL: http://svn.apache.org/viewvc?rev=759022&view=rev
Log:
Re-checking them in to fix some stuff.
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java?rev=759022&r1=759021&r2=759022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java Fri Mar 27 05:21:26 2009
@@ -18,20 +18,24 @@
package org.apache.cassandra.dht;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.LogUtil;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
/**
@@ -44,18 +48,18 @@
/* endpoints that need to be bootstrapped */
protected EndPoint[] targets_ = new EndPoint[0];
/* tokens of the nodes being bootstapped. */
- protected final Token[] tokens_;
+ protected BigInteger[] tokens_ = new BigInteger[0];
protected TokenMetadata tokenMetadata_ = null;
private List<EndPoint> filters_ = new ArrayList<EndPoint>();
- public BootStrapper(EndPoint[] target, Token... token)
+ public BootStrapper(EndPoint[] target, BigInteger[] token)
{
targets_ = target;
tokens_ = token;
tokenMetadata_ = StorageService.instance().getTokenMetadata();
}
- public BootStrapper(EndPoint[] target, Token[] token, EndPoint[] filters)
+ public BootStrapper(EndPoint[] target, BigInteger[] token, EndPoint[] filters)
{
this(target, token);
Collections.addAll(filters_, filters);
@@ -67,14 +71,14 @@
{
logger_.debug("Beginning bootstrap process for " + targets_ + " ...");
/* copy the token to endpoint map */
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* remove the tokens associated with the endpoints being bootstrapped */
- for (Token token : tokens_)
+ for ( BigInteger token : tokens_ )
{
tokenToEndPointMap.remove(token);
}
- Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
+ Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
logger_.debug("Total number of old ranges " + oldRanges.length);
/*
@@ -126,5 +130,21 @@
logger_.debug( LogUtil.throwableToString(th) );
}
}
+
+ private Range getMyOldRange()
+ {
+ Map<EndPoint, BigInteger> oldEndPointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+ Map<BigInteger, EndPoint> oldTokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+
+ oldEndPointToTokenMap.remove(targets_);
+ oldTokenToEndPointMap.remove(tokens_);
+ BigInteger myToken = oldEndPointToTokenMap.get(StorageService.getLocalStorageEndPoint());
+ List<BigInteger> allTokens = new ArrayList<BigInteger>(oldTokenToEndPointMap.keySet());
+ Collections.sort(allTokens);
+ int index = Collections.binarySearch(allTokens, myToken);
+ /* Calculate the lhs for the range */
+ BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
+ return new Range( lhs, myToken );
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=759022&r1=759021&r2=759022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Fri Mar 27 05:21:26 2009
@@ -18,19 +18,19 @@
package org.apache.cassandra.dht;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.net.Message;
- import org.apache.cassandra.net.MessagingService;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
class LeaveJoinProtocolHelper
@@ -42,20 +42,20 @@
* a-----x-----y-----b then we want a mapping from
* (a, b] --> (a, x], (x, y], (y, b]
*/
- protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
+ protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, BigInteger[] allTokens)
{
Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
- Token[] tokens = new Token[allTokens.length];
+ BigInteger[] tokens = new BigInteger[allTokens.length];
System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
Arrays.sort(tokens);
Range prevRange = null;
- Token prevToken = null;
+ BigInteger prevToken = null;
boolean bVal = false;
for ( Range oldRange : oldRanges )
{
- if (bVal)
+ if ( bVal && prevRange != null )
{
bVal = false;
List<Range> subRanges = splitRanges.get(prevRange);
@@ -65,7 +65,7 @@
prevRange = oldRange;
prevToken = oldRange.left();
- for (Token token : tokens)
+ for ( BigInteger token : tokens )
{
List<Range> subRanges = splitRanges.get(oldRange);
if ( oldRange.contains(token) )
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=759022&r1=759021&r2=759022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Fri Mar 27 05:21:26 2009
@@ -18,20 +18,21 @@
package org.apache.cassandra.dht;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
/**
@@ -47,11 +48,11 @@
/* endpoints that are to be moved. */
protected EndPoint[] targets_ = new EndPoint[0];
/* position where they need to be moved */
- protected final Token[] tokens_;
+ protected BigInteger[] tokens_ = new BigInteger[0];
/* token metadata information */
protected TokenMetadata tokenMetadata_ = null;
- public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
+ public LeaveJoinProtocolImpl(EndPoint[] targets, BigInteger[] tokens)
{
targets_ = targets;
tokens_ = tokens;
@@ -64,24 +65,24 @@
{
logger_.debug("Beginning leave/join process for ...");
/* copy the token to endpoint map */
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* copy the endpoint to token map */
- Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+ Map<EndPoint, BigInteger> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
- Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
+ Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
logger_.debug("Total number of old ranges " + oldRanges.length);
/* Calculate the list of nodes that handle the old ranges */
Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
/* Remove the tokens of the nodes leaving the ring */
- Set<Token> tokens = getTokensForLeavingNodes();
+ Set<BigInteger> tokens = getTokensForLeavingNodes();
oldTokens.removeAll(tokens);
Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
/* Get expanded range to initial range mapping */
Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
/* add the new token positions to the old tokens set */
- for (Token token : tokens_)
+ for ( BigInteger token : tokens_ )
oldTokens.add(token);
Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
/* replace the ranges that were split with the split ranges in the old configuration */
@@ -195,12 +196,12 @@
}
}
- private Set<Token> getTokensForLeavingNodes()
+ private Set<BigInteger> getTokensForLeavingNodes()
{
- Set<Token> tokens = new HashSet<Token>();
+ Set<BigInteger> tokens = new HashSet<BigInteger>();
for ( EndPoint target : targets_ )
{
- tokens.add(tokenMetadata_.getToken(target));
+ tokens.add( tokenMetadata_.getToken(target) );
}
return tokens;
}
@@ -275,16 +276,16 @@
public static void main(String[] args) throws Throwable
{
StorageService ss = StorageService.instance();
- ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(3), new EndPoint("A", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(6), new EndPoint("B", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(9), new EndPoint("C", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(12), new EndPoint("D", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(15), new EndPoint("E", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(18), new EndPoint("F", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(21), new EndPoint("G", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(24), new EndPoint("H", 7000));
- Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
+ Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new BigInteger[]{BigInteger.valueOf(22), BigInteger.valueOf(23)} );
runnable.run();
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java?rev=759022&r1=759021&r2=759022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java Fri Mar 27 05:21:26 2009
@@ -21,9 +21,17 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import org.apache.cassandra.gms.GossipDigest;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
@@ -43,12 +51,28 @@
public static ICompactSerializer<Range> serializer()
{
return serializer_;
- }
-
- private Token left_;
- private Token right_;
+ }
+
+ public static boolean isKeyInRanges(List<Range> ranges, String key)
+ {
+ if(ranges == null )
+ return false;
+
+ for ( Range range : ranges)
+ {
+ if(range.contains(StorageService.hash(key)))
+ {
+ return true ;
+ }
+ }
+ return false;
+ }
+
- public Range(Token left, Token right)
+ private BigInteger left_;
+ private BigInteger right_;
+
+ public Range(BigInteger left, BigInteger right)
{
left_ = left;
right_ = right;
@@ -58,7 +82,7 @@
* Returns the left endpoint of a range.
* @return left endpoint
*/
- public Token left()
+ public BigInteger left()
{
return left_;
}
@@ -67,20 +91,58 @@
* Returns the right endpoint of a range.
* @return right endpoint
*/
- public Token right()
+ public BigInteger right()
{
return right_;
}
-
+
+ boolean isSplitRequired()
+ {
+ return ( left_.subtract(right_).signum() >= 0 );
+ }
+
+ public boolean isSplitBy(BigInteger bi)
+ {
+ if ( left_.subtract(right_).signum() > 0 )
+ {
+ /*
+ * left is greater than right we are wrapping around.
+ * So if the interval is [a,b) where a > b then we have
+ * 3 cases one of which holds for any given token k.
+ * (1) k > a -- return true
+ * (2) k < b -- return true
+ * (3) b < k < a -- return false
+ */
+ if ( bi.subtract(left_).signum() > 0 )
+ return true;
+ else if (right_.subtract(bi).signum() > 0 )
+ return true;
+ else
+ return false;
+ }
+ else if ( left_.subtract(right_).signum() < 0 )
+ {
+ /*
+ * This is the range [a, b) where a < b.
+ */
+ return ( bi.subtract(left_).signum() > 0 && right_.subtract(bi).signum() > 0 );
+ }
+ else
+ {
+ // should never be here.
+ return true;
+ }
+ }
+
/**
* Helps determine if a given point on the DHT ring is contained
* in the range in question.
* @param bi point in question
* @return true if the point contains within the range else false.
*/
- public boolean contains(Token bi)
+ public boolean contains(BigInteger bi)
{
- if ( left_.compareTo(right_) > 0 )
+ if ( left_.subtract(right_).signum() > 0 )
{
/*
* left is greater than right we are wrapping around.
@@ -90,31 +152,86 @@
* (2) k < b -- return true
* (3) b < k < a -- return false
*/
- if ( bi.compareTo(left_) >= 0 )
+ if ( bi.subtract(left_).signum() >= 0 )
+ return true;
+ else if (right_.subtract(bi).signum() > 0 )
return true;
- else return right_.compareTo(bi) > 0;
+ else
+ return false;
}
- else if ( left_.compareTo(right_) < 0 )
+ else if ( left_.subtract(right_).signum() < 0 )
{
/*
* This is the range [a, b) where a < b.
*/
- return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) >=0 );
+ return ( bi.subtract(left_).signum() >= 0 && right_.subtract(bi).signum() >=0 );
}
else
{
return true;
}
}
-
+
+ /**
+ * Helps determine if a given range on the DHT ring is contained
+ * within the range associated with the <i>this</i> pointer.
+ * @param rhs rhs in question
+ * @return true if the point contains within the range else false.
+ */
+ public boolean contains(Range rhs)
+ {
+ /*
+ * If (a, b] and (c, d} are not wrap arounds
+ * then return true if a <= c <= d <= b.
+ */
+ if ( !isWrapAround(this) && !isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(rhs.right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /*
+ * If lhs is a wrap around and rhs is not then
+ * rhs.left >= lhs.left and rhs.right >= lhs.left.
+ */
+ if ( isWrapAround(this) && !isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && rhs.right_.subtract(right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /*
+ * If lhs is not a wrap around and rhs is a wrap
+ * around then we just return false.
+ */
+ if ( !isWrapAround(this) && isWrapAround(rhs) )
+ return false;
+
+ if( isWrapAround(this) && isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /* should never be here */
+ return false;
+ }
+
/**
* Tells if the given range is a wrap around.
* @param range
* @return
*/
- private static boolean isWrapAround(Range range)
+ private boolean isWrapAround(Range range)
{
- return range.left_.compareTo(range.right_) > 0;
+ boolean bVal = ( range.left_.subtract(range.right_).signum() > 0 ) ? true : false;
+ return bVal;
}
public int compareTo(Range rhs)
@@ -132,28 +249,15 @@
return right_.compareTo(rhs.right_);
}
-
- public static boolean isKeyInRanges(String key, List<Range> ranges)
- {
- assert ranges != null;
-
- Token token = StorageService.token(key);
- for (Range range : ranges)
- {
- if(range.contains(token))
- {
- return true;
- }
- }
- return false;
- }
-
public boolean equals(Object o)
{
if ( !(o instanceof Range) )
return false;
Range rhs = (Range)o;
- return left_.equals(rhs.left_) && right_.equals(rhs.right_);
+ if ( left_.equals(rhs.left_) && right_.equals(rhs.right_) )
+ return true;
+ else
+ return false;
}
public int hashCode()
@@ -170,13 +274,15 @@
class RangeSerializer implements ICompactSerializer<Range>
{
public void serialize(Range range, DataOutputStream dos) throws IOException
- {
- Token.serializer().serialize(range.left(), dos);
- Token.serializer().serialize(range.right(), dos);
+ {
+ dos.writeUTF(range.left().toString());
+ dos.writeUTF(range.right().toString());
}
public Range deserialize(DataInputStream dis) throws IOException
{
- return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
+ BigInteger left = new BigInteger(dis.readUTF());
+ BigInteger right = new BigInteger(dis.readUTF());
+ return new Range(left, right);
}
}