You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:21:26 UTC

svn commit: r759022 - in /incubator/cassandra/trunk/src/org/apache/cassandra/dht: BootStrapper.java LeaveJoinProtocolHelper.java LeaveJoinProtocolImpl.java Range.java

Author: alakshman
Date: Fri Mar 27 05:21:26 2009
New Revision: 759022

URL: http://svn.apache.org/viewvc?rev=759022&view=rev
Log:
Re-checking them in to fix some stuff.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java?rev=759022&r1=759021&r2=759022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java Fri Mar 27 05:21:26 2009
@@ -18,20 +18,24 @@
 
 package org.apache.cassandra.dht;
 
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.LogUtil;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
 
 
 /**
@@ -44,18 +48,18 @@
     /* endpoints that need to be bootstrapped */
     protected EndPoint[] targets_ = new EndPoint[0];
     /* tokens of the nodes being bootstapped. */
-    protected final Token[] tokens_;
+    protected BigInteger[] tokens_ = new BigInteger[0];
     protected TokenMetadata tokenMetadata_ = null;
     private List<EndPoint> filters_ = new ArrayList<EndPoint>();
 
-    public BootStrapper(EndPoint[] target, Token... token)
+    public BootStrapper(EndPoint[] target, BigInteger[] token)
     {
         targets_ = target;
         tokens_ = token;
         tokenMetadata_ = StorageService.instance().getTokenMetadata();
     }
     
-    public BootStrapper(EndPoint[] target, Token[] token, EndPoint[] filters)
+    public BootStrapper(EndPoint[] target, BigInteger[] token, EndPoint[] filters)
     {
         this(target, token);
         Collections.addAll(filters_, filters);
@@ -67,14 +71,14 @@
         {
             logger_.debug("Beginning bootstrap process for " + targets_ + " ...");                                                               
             /* copy the token to endpoint map */
-            Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+            Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
             /* remove the tokens associated with the endpoints being bootstrapped */                
-            for (Token token : tokens_)
+            for ( BigInteger token : tokens_ )
             {
                 tokenToEndPointMap.remove(token);                    
             }
 
-            Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
+            Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
             Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
             logger_.debug("Total number of old ranges " + oldRanges.length);
             /* 
@@ -126,5 +130,21 @@
             logger_.debug( LogUtil.throwableToString(th) );
         }
     }
+ 
+    private Range getMyOldRange()
+    {
+        Map<EndPoint, BigInteger> oldEndPointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+        Map<BigInteger, EndPoint> oldTokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+
+        oldEndPointToTokenMap.remove(targets_);
+        oldTokenToEndPointMap.remove(tokens_);
 
+        BigInteger myToken = oldEndPointToTokenMap.get(StorageService.getLocalStorageEndPoint());
+        List<BigInteger> allTokens = new ArrayList<BigInteger>(oldTokenToEndPointMap.keySet());
+        Collections.sort(allTokens);
+        int index = Collections.binarySearch(allTokens, myToken);
+        /* Calculate the lhs for the range */
+        BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
+        return new Range( lhs, myToken );
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=759022&r1=759021&r2=759022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Fri Mar 27 05:21:26 2009
@@ -18,19 +18,19 @@
 
 package org.apache.cassandra.dht;
 
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.net.Message;
- import org.apache.cassandra.net.MessagingService;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
 
 
 class LeaveJoinProtocolHelper
@@ -42,20 +42,20 @@
      * a-----x-----y-----b then we want a mapping from 
      * (a, b] --> (a, x], (x, y], (y, b] 
     */
-    protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
+    protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, BigInteger[] allTokens)
     {
         Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
-        Token[] tokens = new Token[allTokens.length];
+        BigInteger[] tokens = new BigInteger[allTokens.length];
         System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
         Arrays.sort(tokens);
         
         Range prevRange = null;
-        Token prevToken = null;
+        BigInteger prevToken = null;
         boolean bVal = false;
         
         for ( Range oldRange : oldRanges )
         {
-            if (bVal)
+            if ( bVal && prevRange != null )
             {
                 bVal = false; 
                 List<Range> subRanges = splitRanges.get(prevRange);
@@ -65,7 +65,7 @@
             
             prevRange = oldRange;
             prevToken = oldRange.left();                
-            for (Token token : tokens)
+            for ( BigInteger token : tokens )
             {     
                 List<Range> subRanges = splitRanges.get(oldRange);
                 if ( oldRange.contains(token) )

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=759022&r1=759021&r2=759022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Fri Mar 27 05:21:26 2009
@@ -18,20 +18,21 @@
 
 package org.apache.cassandra.dht;
 
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
 
 
 /**
@@ -47,11 +48,11 @@
     /* endpoints that are to be moved. */
     protected EndPoint[] targets_ = new EndPoint[0];
     /* position where they need to be moved */
-    protected final Token[] tokens_;
+    protected BigInteger[] tokens_ = new BigInteger[0];
     /* token metadata information */
     protected TokenMetadata tokenMetadata_ = null;
 
-    public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
+    public LeaveJoinProtocolImpl(EndPoint[] targets, BigInteger[] tokens)
     {
         targets_ = targets;
         tokens_ = tokens;
@@ -64,24 +65,24 @@
         {
             logger_.debug("Beginning leave/join process for ...");                                                               
             /* copy the token to endpoint map */
-            Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+            Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
             /* copy the endpoint to token map */
-            Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+            Map<EndPoint, BigInteger> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
             
-            Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
+            Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
             Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
             logger_.debug("Total number of old ranges " + oldRanges.length);
             /* Calculate the list of nodes that handle the old ranges */
             Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
             
             /* Remove the tokens of the nodes leaving the ring */
-            Set<Token> tokens = getTokensForLeavingNodes();
+            Set<BigInteger> tokens = getTokensForLeavingNodes();
             oldTokens.removeAll(tokens);
             Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
             /* Get expanded range to initial range mapping */
             Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
             /* add the new token positions to the old tokens set */
-            for (Token token : tokens_)
+            for ( BigInteger token : tokens_ )
                 oldTokens.add(token);
             Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
             /* replace the ranges that were split with the split ranges in the old configuration */
@@ -195,12 +196,12 @@
         }        
     }
     
-    private Set<Token> getTokensForLeavingNodes()
+    private Set<BigInteger> getTokensForLeavingNodes()
     {
-        Set<Token> tokens = new HashSet<Token>();
+        Set<BigInteger> tokens = new HashSet<BigInteger>();
         for ( EndPoint target : targets_ )
         {
-            tokens.add(tokenMetadata_.getToken(target));
+            tokens.add( tokenMetadata_.getToken(target) );
         }        
         return tokens;
     }
@@ -275,16 +276,16 @@
     public static void main(String[] args) throws Throwable
     {
         StorageService ss = StorageService.instance();
-        ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(3), new EndPoint("A", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(6), new EndPoint("B", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(9), new EndPoint("C", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(12), new EndPoint("D", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(15), new EndPoint("E", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(18), new EndPoint("F", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(21), new EndPoint("G", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(24), new EndPoint("H", 7000)); 
         
-        Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
+        Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new BigInteger[]{BigInteger.valueOf(22), BigInteger.valueOf(23)} );
         runnable.run();
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java?rev=759022&r1=759021&r2=759022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java Fri Mar 27 05:21:26 2009
@@ -21,9 +21,17 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.cassandra.gms.GossipDigest;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.StorageService;
 
 
@@ -43,12 +51,28 @@
     public static ICompactSerializer<Range> serializer()
     {
         return serializer_;
-    }       
-
-    private Token left_;
-    private Token right_;
+    }
+    
+    public static boolean isKeyInRanges(List<Range> ranges, String key)
+    {
+        if(ranges == null ) 
+            return false;
+        
+        for ( Range range : ranges)
+        {
+            if(range.contains(StorageService.hash(key)))
+            {
+                return true ;
+            }
+        }
+        return false;
+    }
+        
     
-    public Range(Token left, Token right)
+    private BigInteger left_;
+    private BigInteger right_;
+    
+    public Range(BigInteger left, BigInteger right)
     {
         left_ = left;
         right_ = right;
@@ -58,7 +82,7 @@
      * Returns the left endpoint of a range.
      * @return left endpoint
      */
-    public Token left()
+    public BigInteger left()
     {
         return left_;
     }
@@ -67,20 +91,58 @@
      * Returns the right endpoint of a range.
      * @return right endpoint
      */
-    public Token right()
+    public BigInteger right()
     {
         return right_;
     }
-
+    
+    boolean isSplitRequired()
+    {
+        return ( left_.subtract(right_).signum() >= 0 );
+    }
+    
+    public boolean isSplitBy(BigInteger bi)
+    {
+        if ( left_.subtract(right_).signum() > 0 )
+        {
+            /* 
+             * left is greater than right we are wrapping around.
+             * So if the interval is [a,b) where a > b then we have
+             * 3 cases one of which holds for any given token k.
+             * (1) k > a -- return true
+             * (2) k < b -- return true
+             * (3) b < k < a -- return false
+            */
+            if ( bi.subtract(left_).signum() > 0 )
+                return true;
+            else if (right_.subtract(bi).signum() > 0 )
+                return true;
+            else
+                return false;
+        }
+        else if ( left_.subtract(right_).signum() < 0 )
+        {
+            /*
+             * This is the range [a, b) where a < b. 
+            */
+            return ( bi.subtract(left_).signum() > 0 && right_.subtract(bi).signum() > 0 );
+        }        
+        else
+        {
+            // should never be here.
+            return true;
+        }       
+    }
+    
     /**
      * Helps determine if a given point on the DHT ring is contained
      * in the range in question.
      * @param bi point in question
      * @return true if the point contains within the range else false.
      */
-    public boolean contains(Token bi)
+    public boolean contains(BigInteger bi)
     {
-        if ( left_.compareTo(right_) > 0 )
+        if ( left_.subtract(right_).signum() > 0 )
         {
             /* 
              * left is greater than right we are wrapping around.
@@ -90,31 +152,86 @@
              * (2) k < b -- return true
              * (3) b < k < a -- return false
             */
-            if ( bi.compareTo(left_) >= 0 )
+            if ( bi.subtract(left_).signum() >= 0 )
+                return true;
+            else if (right_.subtract(bi).signum() > 0 )
                 return true;
-            else return right_.compareTo(bi) > 0;
+            else
+                return false;
         }
-        else if ( left_.compareTo(right_) < 0 )
+        else if ( left_.subtract(right_).signum() < 0 )
         {
             /*
              * This is the range [a, b) where a < b. 
             */
-            return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) >=0 );
+            return ( bi.subtract(left_).signum() >= 0 && right_.subtract(bi).signum() >=0 );
         }        
         else
     	{
     		return true;
     	}    	
     }
-
+    
+    /**
+     * Helps determine if a given range on the DHT ring is contained
+     * within the range associated with the <i>this</i> pointer.
+     * @param rhs rhs in question
+     * @return true if the point contains within the range else false.
+     */
+    public boolean contains(Range rhs)
+    {
+        /* 
+         * If (a, b] and (c, d} are not wrap arounds
+         * then return true if a <= c <= d <= b.
+         */
+        if ( !isWrapAround(this) && !isWrapAround(rhs) )
+        {
+            if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(rhs.right_).signum() >= 0 )
+                return true;
+            else
+                return false;
+        }
+        
+        /*
+         * If lhs is a wrap around and rhs is not then
+         * rhs.left >= lhs.left and rhs.right >= lhs.left.
+         */
+        if ( isWrapAround(this) && !isWrapAround(rhs) )
+        {
+            if ( rhs.left_.subtract(left_).signum() >= 0 && rhs.right_.subtract(right_).signum() >= 0 )
+                return true;
+            else
+                return false;
+        }
+        
+        /* 
+         * If lhs is not a wrap around and rhs is a wrap 
+         * around then we just return false.
+         */
+        if ( !isWrapAround(this) && isWrapAround(rhs) )
+            return false;        
+        
+        if( isWrapAround(this) && isWrapAround(rhs) )
+        {
+            if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(right_).signum() >= 0 )
+                return true;
+            else
+                return false;
+        }
+        
+        /* should never be here */
+        return false;
+    }
+    
     /**
      * Tells if the given range is a wrap around.
      * @param range
      * @return
      */
-    private static boolean isWrapAround(Range range)
+    private boolean isWrapAround(Range range)
     {
-        return range.left_.compareTo(range.right_) > 0;
+        boolean bVal = ( range.left_.subtract(range.right_).signum() > 0 ) ? true : false;
+        return bVal;
     }
     
     public int compareTo(Range rhs)
@@ -132,28 +249,15 @@
         return right_.compareTo(rhs.right_);
     }
     
-
-    public static boolean isKeyInRanges(String key, List<Range> ranges)
-    {
-        assert ranges != null;
-
-        Token token = StorageService.token(key);
-        for (Range range : ranges)
-        {
-            if(range.contains(token))
-            {
-                return true;
-            }
-        }
-        return false;
-    }
-
     public boolean equals(Object o)
     {
         if ( !(o instanceof Range) )
             return false;
         Range rhs = (Range)o;
-        return left_.equals(rhs.left_) && right_.equals(rhs.right_);
+        if ( left_.equals(rhs.left_) && right_.equals(rhs.right_) )
+            return true;
+        else
+            return false;
     }
     
     public int hashCode()
@@ -170,13 +274,15 @@
 class RangeSerializer implements ICompactSerializer<Range>
 {
     public void serialize(Range range, DataOutputStream dos) throws IOException
-    {
-        Token.serializer().serialize(range.left(), dos);
-        Token.serializer().serialize(range.right(), dos);
+    {        
+        dos.writeUTF(range.left().toString());
+        dos.writeUTF(range.right().toString());
     }
 
     public Range deserialize(DataInputStream dis) throws IOException
     {
-        return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
+        BigInteger left = new BigInteger(dis.readUTF());
+        BigInteger right = new BigInteger(dis.readUTF());        
+        return new Range(left, right);
     }
 }