You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC

svn commit: r799331 [12/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Thu Jul 30 15:30:21 2009
@@ -1,292 +1,292 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.LogUtil;
-
-
-/**
- * This class performs the exact opposite of the
- * operations of the BootStrapper class. Given 
- * a bunch of nodes that need to move it determines 
- * who they need to hand off data in terms of ranges.
-*/
-public class LeaveJoinProtocolImpl implements Runnable
-{
-    private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolImpl.class);    
-    
-    /* endpoints that are to be moved. */
-    protected EndPoint[] targets_ = new EndPoint[0];
-    /* position where they need to be moved */
-    protected final Token[] tokens_;
-    /* token metadata information */
-    protected TokenMetadata tokenMetadata_ = null;
-
-    public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
-    {
-        targets_ = targets;
-        tokens_ = tokens;
-        tokenMetadata_ = StorageService.instance().getTokenMetadata();
-    }
-
-    public void run()
-    {  
-        try
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Beginning leave/join process for ...");                                                               
-            /* copy the token to endpoint map */
-            Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-            /* copy the endpoint to token map */
-            Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
-            
-            Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
-            Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Total number of old ranges " + oldRanges.length);
-            /* Calculate the list of nodes that handle the old ranges */
-            Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
-            
-            /* Remove the tokens of the nodes leaving the ring */
-            Set<Token> tokens = getTokensForLeavingNodes();
-            oldTokens.removeAll(tokens);
-            Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
-            /* Get expanded range to initial range mapping */
-            Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
-            /* add the new token positions to the old tokens set */
-            for (Token token : tokens_)
-                oldTokens.add(token);
-            Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
-            /* replace the ranges that were split with the split ranges in the old configuration */
-            addSplitRangesToOldConfiguration(oldRangeToEndPointMap, rangesAfterNodesJoin);
-            
-            /* Re-calculate the new ranges after the new token positions are added */
-            Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
-            /* Remove the old locations from tokenToEndPointMap and add the new locations they are moving to */
-            for ( int i = 0; i < targets_.length; ++i )
-            {
-                tokenToEndPointMap.remove( endpointToTokenMap.get(targets_[i]) );
-                tokenToEndPointMap.put(tokens_[i], targets_[i]);
-            }            
-            /* Calculate the list of nodes that handle the new ranges */            
-            Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
-            /* Remove any expanded ranges and replace them with ranges whose aggregate is the expanded range in the new configuration. */
-            removeExpandedRangesFromNewConfiguration(newRangeToEndPointMap, expandedRangeToOldRangeMap);
-            /* Calculate ranges that need to be sent and from whom to where */
-            Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
-            /* For debug purposes only */
-            Set<Range> ranges = rangesWithSourceTarget.keySet();
-            for ( Range range : ranges )
-            {
-                System.out.print("RANGE: " + range + ":: ");
-                List<BootstrapSourceTarget> infos = rangesWithSourceTarget.get(range);
-                for ( BootstrapSourceTarget info : infos )
-                {
-                    System.out.print(info);
-                    System.out.print(" ");
-                }
-                System.out.println(System.getProperty("line.separator"));
-            }
-            /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
-            LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
-        }
-        catch ( Throwable th )
-        {
-            logger_.warn(LogUtil.throwableToString(th));
-        }
-    }
-    
-    /**
-     * This method figures out the ranges that have been split and
-     * replaces them with the split range.
-     * @param oldRangeToEndPointMap old range mapped to their replicas.
-     * @param rangesAfterNodesJoin ranges after the nodes have joined at
-     *        their respective position.
-     */
-    private void addSplitRangesToOldConfiguration(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
-    {
-        /* 
-         * Find the ranges that are split. Maintain a mapping between
-         * the range being split and the list of subranges.
-        */                
-        Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRangeToEndPointMap.keySet().toArray( new Range[0] ), tokens_);
-        /* Mapping of split ranges to the list of endpoints responsible for the range */                
-        Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();                                
-        Set<Range> rangesSplit = splitRanges.keySet();                
-        for ( Range splitRange : rangesSplit )
-        {
-            replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
-        }
-        /* Remove the ranges that are split. */
-        for ( Range splitRange : rangesSplit )
-        {
-            oldRangeToEndPointMap.remove(splitRange);
-        }
-        
-        /* Add the subranges of the split range to the map with the same replica set. */
-        for ( Range splitRange : rangesSplit )
-        {
-            List<Range> subRanges = splitRanges.get(splitRange);
-            List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
-            for ( Range subRange : subRanges )
-            {
-                /* Make sure we clone or else we are hammered. */
-                oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
-            }
-        }
-    }
-    
-    /**
-     * Reset the newRangeToEndPointMap and replace the expanded range
-     * with the ranges whose aggregate is the expanded range. This happens
-     * only when nodes leave the ring to migrate to a different position.
-     * 
-     * @param newRangeToEndPointMap all new ranges mapped to the replicas 
-     *        responsible for those ranges.
-     * @param expandedRangeToOldRangeMap mapping between the expanded ranges
-     *        and the ranges whose aggregate is the expanded range.
-     */
-    private void removeExpandedRangesFromNewConfiguration(Map<Range, List<EndPoint>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
-    {
-        /* Get the replicas for the expanded ranges */
-        Map<Range, List<EndPoint>> replicasForExpandedRanges = new HashMap<Range, List<EndPoint>>();
-        Set<Range> expandedRanges = expandedRangeToOldRangeMap.keySet();
-        for ( Range expandedRange : expandedRanges )
-        {            
-            replicasForExpandedRanges.put( expandedRange, newRangeToEndPointMap.get(expandedRange) );
-            newRangeToEndPointMap.remove(expandedRange);            
-        }
-        /* replace the expanded ranges in the newRangeToEndPointMap with the subRanges */
-        for ( Range expandedRange : expandedRanges )
-        {
-            List<Range> subRanges = expandedRangeToOldRangeMap.get(expandedRange);
-            List<EndPoint> replicas = replicasForExpandedRanges.get(expandedRange);          
-            for ( Range subRange : subRanges )
-            {
-                newRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
-            }
-        }        
-    }
-    
-    private Set<Token> getTokensForLeavingNodes()
-    {
-        Set<Token> tokens = new HashSet<Token>();
-        for ( EndPoint target : targets_ )
-        {
-            tokens.add(tokenMetadata_.getToken(target));
-        }        
-        return tokens;
-    }
-    
-    /**
-     * Here we are removing the nodes that need to leave the
-     * ring and trying to calculate what the ranges would look
-     * like w/o them. e.g. if we remove two nodes A and D from
-     * the ring and the order of nodes on the ring is A, B, C
-     * and D. When B is removed the range of C is the old range 
-     * of C and the old range of B. We want a mapping from old
-     * range of B to new range of B. We have 
-     * A----B----C----D----E----F----G and we remove b and e
-     * then we want a mapping from (a, c] --> (a,b], (b, c] and 
-     * (d, f] --> (d, e], (d,f].
-     * @param oldRanges ranges with the previous configuration
-     * @param newRanges ranges with the target endpoints removed.
-     * @return map of expanded range to the list whose aggregate is
-     *             the expanded range.
-     */
-    protected static Map<Range, List<Range>> getExpandedRangeToOldRangeMapping(Range[] oldRanges, Range[] newRanges)
-    {
-        Map<Range, List<Range>> map = new HashMap<Range, List<Range>>();   
-        List<Range> oRanges = new ArrayList<Range>();
-        Collections.addAll(oRanges, oldRanges);
-        List<Range> nRanges = new ArrayList<Range>();
-        Collections.addAll(nRanges, newRanges);
-        
-        /*
-         * Remove the ranges that are the same. 
-         * Now we will be left with the expanded 
-         * ranges in the nRanges list and the 
-         * smaller ranges in the oRanges list. 
-        */
-        for( Range oRange : oldRanges )
-        {            
-            boolean bVal = nRanges.remove(oRange);
-            if ( bVal )
-                oRanges.remove(oRange);
-        }
-        
-        int nSize = nRanges.size();
-        int oSize = oRanges.size();
-        /*
-         * Establish the mapping between expanded ranges
-         * to the smaller ranges whose aggregate is the
-         * expanded range. 
-        */
-        for ( int i = 0; i < nSize; ++i )
-        {
-            Range nRange = nRanges.get(i);
-            for ( int j = 0; j < oSize; ++j )
-            {
-                Range oRange = oRanges.get(j);
-                if ( nRange.contains(oRange.right()) )
-                {
-                    List<Range> smallerRanges = map.get(nRange);
-                    if ( smallerRanges == null )
-                    {
-                        smallerRanges = new ArrayList<Range>();
-                        map.put(nRange, smallerRanges);
-                    }
-                    smallerRanges.add(oRange);
-                    continue;
-                }
-            }
-        }
-        
-        return map;
-    }
-
-    public static void main(String[] args) throws Throwable
-    {
-        StorageService ss = StorageService.instance();
-        ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
-        
-        Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
-        runnable.run();
-    }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.LogUtil;
+
+
+/**
+ * This class performs the exact opposite of the
+ * operations of the BootStrapper class. Given 
+ * a bunch of nodes that need to move it determines 
+ * who they need to hand off data in terms of ranges.
+*/
+public class LeaveJoinProtocolImpl implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolImpl.class);    
+    
+    /* endpoints that are to be moved. */
+    protected EndPoint[] targets_ = new EndPoint[0];
+    /* position where they need to be moved */
+    protected final Token[] tokens_;
+    /* token metadata information */
+    protected TokenMetadata tokenMetadata_ = null;
+
+    public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
+    {
+        targets_ = targets;
+        tokens_ = tokens;
+        tokenMetadata_ = StorageService.instance().getTokenMetadata();
+    }
+
+    public void run()
+    {  
+        try
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Beginning leave/join process for ...");                                                               
+            /* copy the token to endpoint map */
+            Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+            /* copy the endpoint to token map */
+            Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+            
+            Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
+            Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Total number of old ranges " + oldRanges.length);
+            /* Calculate the list of nodes that handle the old ranges */
+            Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
+            
+            /* Remove the tokens of the nodes leaving the ring */
+            Set<Token> tokens = getTokensForLeavingNodes();
+            oldTokens.removeAll(tokens);
+            Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
+            /* Get expanded range to initial range mapping */
+            Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
+            /* add the new token positions to the old tokens set */
+            for (Token token : tokens_)
+                oldTokens.add(token);
+            Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
+            /* replace the ranges that were split with the split ranges in the old configuration */
+            addSplitRangesToOldConfiguration(oldRangeToEndPointMap, rangesAfterNodesJoin);
+            
+            /* Re-calculate the new ranges after the new token positions are added */
+            Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
+            /* Remove the old locations from tokenToEndPointMap and add the new locations they are moving to */
+            for ( int i = 0; i < targets_.length; ++i )
+            {
+                tokenToEndPointMap.remove( endpointToTokenMap.get(targets_[i]) );
+                tokenToEndPointMap.put(tokens_[i], targets_[i]);
+            }            
+            /* Calculate the list of nodes that handle the new ranges */            
+            Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
+            /* Remove any expanded ranges and replace them with ranges whose aggregate is the expanded range in the new configuration. */
+            removeExpandedRangesFromNewConfiguration(newRangeToEndPointMap, expandedRangeToOldRangeMap);
+            /* Calculate ranges that need to be sent and from whom to where */
+            Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
+            /* For debug purposes only */
+            Set<Range> ranges = rangesWithSourceTarget.keySet();
+            for ( Range range : ranges )
+            {
+                System.out.print("RANGE: " + range + ":: ");
+                List<BootstrapSourceTarget> infos = rangesWithSourceTarget.get(range);
+                for ( BootstrapSourceTarget info : infos )
+                {
+                    System.out.print(info);
+                    System.out.print(" ");
+                }
+                System.out.println(System.getProperty("line.separator"));
+            }
+            /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
+            LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
+        }
+        catch ( Throwable th )
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+    }
+    
+    /**
+     * This method figures out the ranges that have been split and
+     * replaces them with the split range.
+     * @param oldRangeToEndPointMap old range mapped to their replicas.
+     * @param rangesAfterNodesJoin ranges after the nodes have joined at
+     *        their respective position.
+     */
+    private void addSplitRangesToOldConfiguration(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
+    {
+        /* 
+         * Find the ranges that are split. Maintain a mapping between
+         * the range being split and the list of subranges.
+        */                
+        Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRangeToEndPointMap.keySet().toArray( new Range[0] ), tokens_);
+        /* Mapping of split ranges to the list of endpoints responsible for the range */                
+        Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();                                
+        Set<Range> rangesSplit = splitRanges.keySet();                
+        for ( Range splitRange : rangesSplit )
+        {
+            replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
+        }
+        /* Remove the ranges that are split. */
+        for ( Range splitRange : rangesSplit )
+        {
+            oldRangeToEndPointMap.remove(splitRange);
+        }
+        
+        /* Add the subranges of the split range to the map with the same replica set. */
+        for ( Range splitRange : rangesSplit )
+        {
+            List<Range> subRanges = splitRanges.get(splitRange);
+            List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+            for ( Range subRange : subRanges )
+            {
+                /* Make sure we clone or else we are hammered. */
+                oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+            }
+        }
+    }
+    
+    /**
+     * Reset the newRangeToEndPointMap and replace the expanded range
+     * with the ranges whose aggregate is the expanded range. This happens
+     * only when nodes leave the ring to migrate to a different position.
+     * 
+     * @param newRangeToEndPointMap all new ranges mapped to the replicas 
+     *        responsible for those ranges.
+     * @param expandedRangeToOldRangeMap mapping between the expanded ranges
+     *        and the ranges whose aggregate is the expanded range.
+     */
+    private void removeExpandedRangesFromNewConfiguration(Map<Range, List<EndPoint>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
+    {
+        /* Get the replicas for the expanded ranges */
+        Map<Range, List<EndPoint>> replicasForExpandedRanges = new HashMap<Range, List<EndPoint>>();
+        Set<Range> expandedRanges = expandedRangeToOldRangeMap.keySet();
+        for ( Range expandedRange : expandedRanges )
+        {            
+            replicasForExpandedRanges.put( expandedRange, newRangeToEndPointMap.get(expandedRange) );
+            newRangeToEndPointMap.remove(expandedRange);            
+        }
+        /* replace the expanded ranges in the newRangeToEndPointMap with the subRanges */
+        for ( Range expandedRange : expandedRanges )
+        {
+            List<Range> subRanges = expandedRangeToOldRangeMap.get(expandedRange);
+            List<EndPoint> replicas = replicasForExpandedRanges.get(expandedRange);          
+            for ( Range subRange : subRanges )
+            {
+                newRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+            }
+        }        
+    }
+    
+    private Set<Token> getTokensForLeavingNodes()
+    {
+        Set<Token> tokens = new HashSet<Token>();
+        for ( EndPoint target : targets_ )
+        {
+            tokens.add(tokenMetadata_.getToken(target));
+        }        
+        return tokens;
+    }
+    
+    /**
+     * Here we are removing the nodes that need to leave the
+     * ring and trying to calculate what the ranges would look
+     * like w/o them. e.g. if we remove two nodes A and D from
+     * the ring and the order of nodes on the ring is A, B, C
+     * and D. When B is removed the range of C is the old range 
+     * of C and the old range of B. We want a mapping from old
+     * range of B to new range of B. We have 
+     * A----B----C----D----E----F----G and we remove b and e
+     * then we want a mapping from (a, c] --> (a,b], (b, c] and 
+     * (d, f] --> (d, e], (d,f].
+     * @param oldRanges ranges with the previous configuration
+     * @param newRanges ranges with the target endpoints removed.
+     * @return map of expanded range to the list whose aggregate is
+     *             the expanded range.
+     */
+    protected static Map<Range, List<Range>> getExpandedRangeToOldRangeMapping(Range[] oldRanges, Range[] newRanges)
+    {
+        Map<Range, List<Range>> map = new HashMap<Range, List<Range>>();   
+        List<Range> oRanges = new ArrayList<Range>();
+        Collections.addAll(oRanges, oldRanges);
+        List<Range> nRanges = new ArrayList<Range>();
+        Collections.addAll(nRanges, newRanges);
+        
+        /*
+         * Remove the ranges that are the same. 
+         * Now we will be left with the expanded 
+         * ranges in the nRanges list and the 
+         * smaller ranges in the oRanges list. 
+        */
+        for( Range oRange : oldRanges )
+        {            
+            boolean bVal = nRanges.remove(oRange);
+            if ( bVal )
+                oRanges.remove(oRange);
+        }
+        
+        int nSize = nRanges.size();
+        int oSize = oRanges.size();
+        /*
+         * Establish the mapping between expanded ranges
+         * to the smaller ranges whose aggregate is the
+         * expanded range. 
+        */
+        for ( int i = 0; i < nSize; ++i )
+        {
+            Range nRange = nRanges.get(i);
+            for ( int j = 0; j < oSize; ++j )
+            {
+                Range oRange = oRanges.get(j);
+                if ( nRange.contains(oRange.right()) )
+                {
+                    List<Range> smallerRanges = map.get(nRange);
+                    if ( smallerRanges == null )
+                    {
+                        smallerRanges = new ArrayList<Range>();
+                        map.put(nRange, smallerRanges);
+                    }
+                    smallerRanges.add(oRange);
+                    continue;
+                }
+            }
+        }
+        
+        return map;
+    }
+
+    public static void main(String[] args) throws Throwable
+    {
+        StorageService ss = StorageService.instance();
+        ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
+        
+        Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
+        runnable.run();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java Thu Jul 30 15:30:21 2009
@@ -1,129 +1,129 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.UnsupportedEncodingException;
-import java.text.Collator;
-import java.util.Comparator;
-import java.util.Locale;
-import java.util.Random;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-public class OrderPreservingPartitioner implements IPartitioner
-{
-    // TODO make locale configurable.  But don't just leave it up to the OS or you could really screw
-    // people over if they deploy on nodes with different OS locales.
-    static final Collator collator = Collator.getInstance(new Locale("en", "US")); 
-
-    private static final Comparator<String> comparator = new Comparator<String>() {
-        public int compare(String o1, String o2)
-        {
-            return collator.compare(o1, o2);
-        }
-    };
-    private static final Comparator<String> reverseComparator = new Comparator<String>() {
-        public int compare(String o1, String o2)
-        {
-            return -comparator.compare(o1, o2);
-        }
-    };
-
-    public String decorateKey(String key)
-    {
-        return key;
-    }
-
-    public String undecorateKey(String decoratedKey)
-    {
-        return decoratedKey;
-    }
-
-    public Comparator<String> getDecoratedKeyComparator()
-    {
-        return comparator;
-    }
-
-    public Comparator<String> getReverseDecoratedKeyComparator()
-    {
-        return reverseComparator;
-    }
-
-    public StringToken getDefaultToken()
-    {
-        String initialToken = DatabaseDescriptor.getInitialToken();
-        if (initialToken != null)
-            return new StringToken(initialToken);
-
-        // generate random token
-        String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
-        Random r = new Random();
-        StringBuilder buffer = new StringBuilder();
-        for (int j = 0; j < 16; j++) {
-            buffer.append(chars.charAt(r.nextInt(chars.length())));
-        }
-        return new StringToken(buffer.toString());
-    }
-
-    private final Token.TokenFactory<String> tokenFactory = new Token.TokenFactory<String>() {
-        public byte[] toByteArray(Token<String> stringToken)
-        {
-            try
-            {
-                return stringToken.token.getBytes("UTF-8");
-            }
-            catch (UnsupportedEncodingException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        public Token<String> fromByteArray(byte[] bytes)
-        {
-            try
-            {
-                return new StringToken(new String(bytes, "UTF-8"));
-            }
-            catch (UnsupportedEncodingException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        public String toString(Token<String> stringToken)
-        {
-            return stringToken.token;
-        }
-
-        public Token<String> fromString(String string)
-        {
-            return new StringToken(string);
-        }
-    };
-
-    public Token.TokenFactory<String> getTokenFactory()
-    {
-        return tokenFactory;
-    }
-
-    public Token getInitialToken(String key)
-    {
-        return new StringToken(key);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.UnsupportedEncodingException;
+import java.text.Collator;
+import java.util.Comparator;
+import java.util.Locale;
+import java.util.Random;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class OrderPreservingPartitioner implements IPartitioner
+{
+    // TODO make locale configurable.  But don't just leave it up to the OS or you could really screw
+    // people over if they deploy on nodes with different OS locales.
+    static final Collator collator = Collator.getInstance(new Locale("en", "US")); 
+
+    private static final Comparator<String> comparator = new Comparator<String>() {
+        public int compare(String o1, String o2)
+        {
+            return collator.compare(o1, o2);
+        }
+    };
+    private static final Comparator<String> reverseComparator = new Comparator<String>() {
+        public int compare(String o1, String o2)
+        {
+            return -comparator.compare(o1, o2);
+        }
+    };
+
+    public String decorateKey(String key)
+    {
+        return key;
+    }
+
+    public String undecorateKey(String decoratedKey)
+    {
+        return decoratedKey;
+    }
+
+    public Comparator<String> getDecoratedKeyComparator()
+    {
+        return comparator;
+    }
+
+    public Comparator<String> getReverseDecoratedKeyComparator()
+    {
+        return reverseComparator;
+    }
+
+    public StringToken getDefaultToken()
+    {
+        String initialToken = DatabaseDescriptor.getInitialToken();
+        if (initialToken != null)
+            return new StringToken(initialToken);
+
+        // generate random token
+        String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+        Random r = new Random();
+        StringBuilder buffer = new StringBuilder();
+        for (int j = 0; j < 16; j++) {
+            buffer.append(chars.charAt(r.nextInt(chars.length())));
+        }
+        return new StringToken(buffer.toString());
+    }
+
+    private final Token.TokenFactory<String> tokenFactory = new Token.TokenFactory<String>() {
+        public byte[] toByteArray(Token<String> stringToken)
+        {
+            try
+            {
+                return stringToken.token.getBytes("UTF-8");
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public Token<String> fromByteArray(byte[] bytes)
+        {
+            try
+            {
+                return new StringToken(new String(bytes, "UTF-8"));
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public String toString(Token<String> stringToken)
+        {
+            return stringToken.token;
+        }
+
+        public Token<String> fromString(String string)
+        {
+            return new StringToken(string);
+        }
+    };
+
+    public Token.TokenFactory<String> getTokenFactory()
+    {
+        return tokenFactory;
+    }
+
+    public Token getInitialToken(String key)
+    {
+        return new StringToken(key);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java Thu Jul 30 15:30:21 2009
@@ -1,121 +1,121 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.math.BigInteger;
-import java.util.Comparator;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.GuidGenerator;
-
-/**
- * This class generates a BigIntegerToken using MD5 hash.
- */
-public class RandomPartitioner implements IPartitioner
-{
-    private static final Comparator<String> comparator = new Comparator<String>()
-    {
-        public int compare(String o1, String o2)
-        {
-            String[] split1 = o1.split(":", 2);
-            String[] split2 = o2.split(":", 2);
-            BigInteger i1 = new BigInteger(split1[0]);
-            BigInteger i2 = new BigInteger(split2[0]);
-            int v = i1.compareTo(i2);
-            if (v != 0) {
-                return v;
-            }
-            return split1[1].compareTo(split2[1]);
-        }
-    };
-    private static final Comparator<String> rcomparator = new Comparator<String>()
-    {
-        public int compare(String o1, String o2)
-        {
-            return -comparator.compare(o1, o2);
-        }
-    };
-
-    public String decorateKey(String key)
-    {
-        return FBUtilities.hash(key).toString() + ":" + key;
-    }
-
-    public String undecorateKey(String decoratedKey)
-    {
-        return decoratedKey.split(":", 2)[1];
-    }
-
-    public Comparator<String> getDecoratedKeyComparator()
-    {
-        return comparator;
-    }
-
-    public Comparator<String> getReverseDecoratedKeyComparator()
-    {
-        return rcomparator;
-    }
-
-    public BigIntegerToken getDefaultToken()
-    {
-        String initialToken = DatabaseDescriptor.getInitialToken();
-        if (initialToken != null)
-            return new BigIntegerToken(new BigInteger(initialToken));
-
-        // generate random token
-        String guid = GuidGenerator.guid();
-        BigInteger token = FBUtilities.hash(guid);
-        if ( token.signum() == -1 )
-            token = token.multiply(BigInteger.valueOf(-1L));
-        return new BigIntegerToken(token);
-    }
-
-    private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
-        public byte[] toByteArray(Token<BigInteger> bigIntegerToken)
-        {
-            return bigIntegerToken.token.toByteArray();
-        }
-
-        public Token<BigInteger> fromByteArray(byte[] bytes)
-        {
-            return new BigIntegerToken(new BigInteger(bytes));
-        }
-
-        public String toString(Token<BigInteger> bigIntegerToken)
-        {
-            return bigIntegerToken.token.toString();
-        }
-
-        public Token<BigInteger> fromString(String string)
-        {
-            return new BigIntegerToken(new BigInteger(string));
-        }
-    };
-
-    public Token.TokenFactory<BigInteger> getTokenFactory()
-    {
-        return tokenFactory;
-    }
-
-    public Token getInitialToken(String key)
-    {
-        return new BigIntegerToken(FBUtilities.hash(key));
-    }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.util.Comparator;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+
+/**
+ * This class generates a BigIntegerToken using MD5 hash.
+ */
+public class RandomPartitioner implements IPartitioner
+{
+    private static final Comparator<String> comparator = new Comparator<String>()
+    {
+        public int compare(String o1, String o2)
+        {
+            String[] split1 = o1.split(":", 2);
+            String[] split2 = o2.split(":", 2);
+            BigInteger i1 = new BigInteger(split1[0]);
+            BigInteger i2 = new BigInteger(split2[0]);
+            int v = i1.compareTo(i2);
+            if (v != 0) {
+                return v;
+            }
+            return split1[1].compareTo(split2[1]);
+        }
+    };
+    private static final Comparator<String> rcomparator = new Comparator<String>()
+    {
+        public int compare(String o1, String o2)
+        {
+            return -comparator.compare(o1, o2);
+        }
+    };
+
+    public String decorateKey(String key)
+    {
+        return FBUtilities.hash(key).toString() + ":" + key;
+    }
+
+    public String undecorateKey(String decoratedKey)
+    {
+        return decoratedKey.split(":", 2)[1];
+    }
+
+    public Comparator<String> getDecoratedKeyComparator()
+    {
+        return comparator;
+    }
+
+    public Comparator<String> getReverseDecoratedKeyComparator()
+    {
+        return rcomparator;
+    }
+
+    public BigIntegerToken getDefaultToken()
+    {
+        String initialToken = DatabaseDescriptor.getInitialToken();
+        if (initialToken != null)
+            return new BigIntegerToken(new BigInteger(initialToken));
+
+        // generate random token
+        String guid = GuidGenerator.guid();
+        BigInteger token = FBUtilities.hash(guid);
+        if ( token.signum() == -1 )
+            token = token.multiply(BigInteger.valueOf(-1L));
+        return new BigIntegerToken(token);
+    }
+
+    private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
+        public byte[] toByteArray(Token<BigInteger> bigIntegerToken)
+        {
+            return bigIntegerToken.token.toByteArray();
+        }
+
+        public Token<BigInteger> fromByteArray(byte[] bytes)
+        {
+            return new BigIntegerToken(new BigInteger(bytes));
+        }
+
+        public String toString(Token<BigInteger> bigIntegerToken)
+        {
+            return bigIntegerToken.token.toString();
+        }
+
+        public Token<BigInteger> fromString(String string)
+        {
+            return new BigIntegerToken(new BigInteger(string));
+        }
+    };
+
+    public Token.TokenFactory<BigInteger> getTokenFactory()
+    {
+        return tokenFactory;
+    }
+
+    public Token getInitialToken(String key)
+    {
+        return new BigIntegerToken(FBUtilities.hash(key));
+    }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java Thu Jul 30 15:30:21 2009
@@ -1,186 +1,186 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.math.BigInteger;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.service.StorageService;
-
-
-/**
- * A representation of the range that a node is responsible for on the DHT ring.
- *
- * A Range is responsible for the tokens between [left, right).
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class Range implements Comparable<Range>, Serializable
-{
-    private static ICompactSerializer<Range> serializer_;
-    static
-    {
-        serializer_ = new RangeSerializer();
-    }
-    
-    public static ICompactSerializer<Range> serializer()
-    {
-        return serializer_;
-    }
-
-    private final Token left_;
-    private final Token right_;
-
-    public Range(Token left, Token right)
-    {
-        left_ = left;
-        right_ = right;
-    }
-
-    /**
-     * Returns the left endpoint of a range.
-     * @return left endpoint
-     */
-    public Token left()
-    {
-        return left_;
-    }
-    
-    /**
-     * Returns the right endpoint of a range.
-     * @return right endpoint
-     */
-    public Token right()
-    {
-        return right_;
-    }
-
-    /**
-     * Helps determine if a given point on the DHT ring is contained
-     * in the range in question.
-     * @param bi point in question
-     * @return true if the point contains within the range else false.
-     */
-    public boolean contains(Token bi)
-    {
-        if ( left_.compareTo(right_) > 0 )
-        {
-            /* 
-             * left is greater than right we are wrapping around.
-             * So if the interval is [a,b) where a > b then we have
-             * 3 cases one of which holds for any given token k.
-             * (1) k > a -- return true
-             * (2) k < b -- return true
-             * (3) b < k < a -- return false
-            */
-            if ( bi.compareTo(left_) >= 0 )
-                return true;
-            else return right_.compareTo(bi) > 0;
-        }
-        else if ( left_.compareTo(right_) < 0 )
-        {
-            /*
-             * This is the range [a, b) where a < b. 
-            */
-            return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) > 0 );
-        }        
-        else
-    	{
-    		return true;
-    	}    	
-    }
-
-    /**
-     * Tells if the given range is a wrap around.
-     * @param range
-     * @return
-     */
-    private static boolean isWrapAround(Range range)
-    {
-        return range.left_.compareTo(range.right_) > 0;
-    }
-    
-    public int compareTo(Range rhs)
-    {
-        /* 
-         * If the range represented by the "this" pointer
-         * is a wrap around then it is the smaller one.
-        */
-        if ( isWrapAround(this) )
-            return -1;
-        
-        if ( isWrapAround(rhs) )
-            return 1;
-        
-        return right_.compareTo(rhs.right_);
-    }
-    
-
-    public static boolean isTokenInRanges(Token token, List<Range> ranges)
-    {
-        assert ranges != null;
-
-        for (Range range : ranges)
-        {
-            if(range.contains(token))
-            {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public boolean equals(Object o)
-    {
-        if ( !(o instanceof Range) )
-            return false;
-        Range rhs = (Range)o;
-        return left_.equals(rhs.left_) && right_.equals(rhs.right_);
-    }
-    
-    public int hashCode()
-    {
-        return toString().hashCode();
-    }
-    
-    public String toString()
-    {
-        return "(" + left_ + "," + right_ + "]";
-    }
-}
-
-class RangeSerializer implements ICompactSerializer<Range>
-{
-    public void serialize(Range range, DataOutputStream dos) throws IOException
-    {
-        Token.serializer().serialize(range.left(), dos);
-        Token.serializer().serialize(range.right(), dos);
-    }
-
-    public Range deserialize(DataInputStream dis) throws IOException
-    {
-        return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.math.BigInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * A representation of the range that a node is responsible for on the DHT ring.
+ *
+ * A Range is responsible for the tokens between [left, right).
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Range implements Comparable<Range>, Serializable
+{
+    private static ICompactSerializer<Range> serializer_;
+    static
+    {
+        serializer_ = new RangeSerializer();
+    }
+    
+    public static ICompactSerializer<Range> serializer()
+    {
+        return serializer_;
+    }
+
+    private final Token left_;
+    private final Token right_;
+
+    public Range(Token left, Token right)
+    {
+        left_ = left;
+        right_ = right;
+    }
+
+    /**
+     * Returns the left endpoint of a range.
+     * @return left endpoint
+     */
+    public Token left()
+    {
+        return left_;
+    }
+    
+    /**
+     * Returns the right endpoint of a range.
+     * @return right endpoint
+     */
+    public Token right()
+    {
+        return right_;
+    }
+
+    /**
+     * Helps determine if a given point on the DHT ring is contained
+     * in the range in question.
+     * @param bi point in question
+     * @return true if the point contains within the range else false.
+     */
+    public boolean contains(Token bi)
+    {
+        if ( left_.compareTo(right_) > 0 )
+        {
+            /* 
+             * left is greater than right we are wrapping around.
+             * So if the interval is [a,b) where a > b then we have
+             * 3 cases one of which holds for any given token k.
+             * (1) k > a -- return true
+             * (2) k < b -- return true
+             * (3) b < k < a -- return false
+            */
+            if ( bi.compareTo(left_) >= 0 )
+                return true;
+            else return right_.compareTo(bi) > 0;
+        }
+        else if ( left_.compareTo(right_) < 0 )
+        {
+            /*
+             * This is the range [a, b) where a < b. 
+            */
+            return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) > 0 );
+        }        
+        else
+    	{
+    		return true;
+    	}    	
+    }
+
+    /**
+     * Tells if the given range is a wrap around.
+     * @param range
+     * @return
+     */
+    private static boolean isWrapAround(Range range)
+    {
+        return range.left_.compareTo(range.right_) > 0;
+    }
+    
+    public int compareTo(Range rhs)
+    {
+        /* 
+         * If the range represented by the "this" pointer
+         * is a wrap around then it is the smaller one.
+        */
+        if ( isWrapAround(this) )
+            return -1;
+        
+        if ( isWrapAround(rhs) )
+            return 1;
+        
+        return right_.compareTo(rhs.right_);
+    }
+    
+
+    public static boolean isTokenInRanges(Token token, List<Range> ranges)
+    {
+        assert ranges != null;
+
+        for (Range range : ranges)
+        {
+            if(range.contains(token))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean equals(Object o)
+    {
+        if ( !(o instanceof Range) )
+            return false;
+        Range rhs = (Range)o;
+        return left_.equals(rhs.left_) && right_.equals(rhs.right_);
+    }
+    
+    public int hashCode()
+    {
+        return toString().hashCode();
+    }
+    
+    public String toString()
+    {
+        return "(" + left_ + "," + right_ + "]";
+    }
+}
+
+class RangeSerializer implements ICompactSerializer<Range>
+{
+    public void serialize(Range range, DataOutputStream dos) throws IOException
+    {
+        Token.serializer().serialize(range.left(), dos);
+        Token.serializer().serialize(range.right(), dos);
+    }
+
+    public Range deserialize(DataInputStream dis) throws IOException
+    {
+        return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Thu Jul 30 15:30:21 2009
@@ -1,101 +1,101 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-
-
-/**
- * This abstraction represents the state associated with a particular node which an
- * application wants to make available to the rest of the nodes in the cluster. 
- * Whenever a piece of state needs to be disseminated to the rest of cluster wrap
- * the state in an instance of <i>ApplicationState</i> and add it to the Gossiper.
- *  
- * e.g. if we want to disseminate load information for node A do the following:
- * 
- *      ApplicationState loadState = new ApplicationState(<string representation of load>);
- *      Gossiper.instance().addApplicationState("LOAD STATE", loadState);
- *  
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ApplicationState
-{
-    private static ICompactSerializer<ApplicationState> serializer_;
-    static
-    {
-        serializer_ = new ApplicationStateSerializer();
-    }
-    
-    int version_;
-    String state_;
-
-        
-    ApplicationState(String state, int version)
-    {
-        state_ = state;
-        version_ = version;
-    }
-
-    public static ICompactSerializer<ApplicationState> serializer()
-    {
-        return serializer_;
-    }
-    
-    /**
-     * Wraps the specified state into a ApplicationState instance.
-     * @param state string representation of arbitrary state.
-     */
-    public ApplicationState(String state)
-    {
-        state_ = state;
-        version_ = VersionGenerator.getNextVersion();
-    }
-        
-    public String getState()
-    {
-        return state_;
-    }
-    
-    int getStateVersion()
-    {
-        return version_;
-    }
-}
-
-class ApplicationStateSerializer implements ICompactSerializer<ApplicationState>
-{
-    public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException
-    {
-        dos.writeUTF(appState.state_);
-        dos.writeInt(appState.version_);
-    }
-
-    public ApplicationState deserialize(DataInputStream dis) throws IOException
-    {
-        String state = dis.readUTF();
-        int version = dis.readInt();
-        return new ApplicationState(state, version);
-    }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * This abstraction represents the state associated with a particular node which an
+ * application wants to make available to the rest of the nodes in the cluster. 
+ * Whenever a piece of state needs to be disseminated to the rest of cluster wrap
+ * the state in an instance of <i>ApplicationState</i> and add it to the Gossiper.
+ *  
+ * e.g. if we want to disseminate load information for node A do the following:
+ * 
+ *      ApplicationState loadState = new ApplicationState(<string representation of load>);
+ *      Gossiper.instance().addApplicationState("LOAD STATE", loadState);
+ *  
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ApplicationState
+{
+    private static ICompactSerializer<ApplicationState> serializer_;
+    static
+    {
+        serializer_ = new ApplicationStateSerializer();
+    }
+    
+    int version_;
+    String state_;
+
+        
+    ApplicationState(String state, int version)
+    {
+        state_ = state;
+        version_ = version;
+    }
+
+    public static ICompactSerializer<ApplicationState> serializer()
+    {
+        return serializer_;
+    }
+    
+    /**
+     * Wraps the specified state into a ApplicationState instance.
+     * @param state string representation of arbitrary state.
+     */
+    public ApplicationState(String state)
+    {
+        state_ = state;
+        version_ = VersionGenerator.getNextVersion();
+    }
+        
+    public String getState()
+    {
+        return state_;
+    }
+    
+    int getStateVersion()
+    {
+        return version_;
+    }
+}
+
+class ApplicationStateSerializer implements ICompactSerializer<ApplicationState>
+{
+    public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(appState.state_);
+        dos.writeInt(appState.version_);
+    }
+
+    public ApplicationState deserialize(DataInputStream dis) throws IOException
+    {
+        String state = dis.readUTF();
+        int version = dis.readInt();
+        return new ApplicationState(state, version);
+    }
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java Thu Jul 30 15:30:21 2009
@@ -1,184 +1,184 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-import org.apache.cassandra.io.ICompactSerializer;
-
-import org.apache.log4j.Logger;
-
-/**
- * This abstraction represents both the HeartBeatState and the ApplicationState in an EndPointState
- * instance. Any state for a given endpoint can be retrieved from this instance.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class EndPointState
-{
-    private static ICompactSerializer<EndPointState> serializer_;
-    static
-    {
-        serializer_ = new EndPointStateSerializer();
-    }
-    
-    HeartBeatState hbState_;
-    Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>();
-    
-    /* fields below do not get serialized */
-    long updateTimestamp_;
-    boolean isAlive_;
-    boolean isAGossiper_;
-
-    public static ICompactSerializer<EndPointState> serializer()
-    {
-        return serializer_;
-    }
-    
-    EndPointState(HeartBeatState hbState) 
-    { 
-        hbState_ = hbState; 
-        updateTimestamp_ = System.currentTimeMillis(); 
-        isAlive_ = true; 
-        isAGossiper_ = false;
-    }
-        
-    HeartBeatState getHeartBeatState()
-    {
-        return hbState_;
-    }
-    
-    synchronized void setHeartBeatState(HeartBeatState hbState)
-    {
-        updateTimestamp();
-        hbState_ = hbState;
-    }
-    
-    public ApplicationState getApplicationState(String key)
-    {
-        return applicationState_.get(key);
-    }
-    
-    public Map<String, ApplicationState> getApplicationState()
-    {
-        return applicationState_;
-    }
-    
-    void addApplicationState(String key, ApplicationState appState)
-    {        
-        applicationState_.put(key, appState);        
-    }
-
-    /* getters and setters */
-    long getUpdateTimestamp()
-    {
-        return updateTimestamp_;
-    }
-    
-    synchronized void updateTimestamp()
-    {
-        updateTimestamp_ = System.currentTimeMillis();
-    }
-    
-    public boolean isAlive()
-    {        
-        return isAlive_;
-    }
-
-    synchronized void isAlive(boolean value)
-    {        
-        isAlive_ = value;        
-    }
-
-    
-    boolean isAGossiper()
-    {        
-        return isAGossiper_;
-    }
-
-    synchronized void isAGossiper(boolean value)
-    {                
-        //isAlive_ = false;
-        isAGossiper_ = value;        
-    }
-}
-
-class EndPointStateSerializer implements ICompactSerializer<EndPointState>
-{
-    private static Logger logger_ = Logger.getLogger(EndPointStateSerializer.class);
-    
-    public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
-    {
-        /* These are for estimating whether we overshoot the MTU limit */
-        int estimate = 0;
-
-        /* serialize the HeartBeatState */
-        HeartBeatState hbState = epState.getHeartBeatState();
-        HeartBeatState.serializer().serialize(hbState, dos);
-
-        /* serialize the map of ApplicationState objects */
-        int size = epState.applicationState_.size();
-        dos.writeInt(size);
-        if ( size > 0 )
-        {   
-            Set<String> keys = epState.applicationState_.keySet();
-            for( String key : keys )
-            {
-                if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-                {
-                    logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
-                    break;
-                }
-            
-                ApplicationState appState = epState.applicationState_.get(key);
-                if ( appState != null )
-                {
-                    int pre = dos.size();
-                    dos.writeUTF(key);
-                    ApplicationState.serializer().serialize(appState, dos);                    
-                    int post = dos.size();
-                    estimate = post - pre;
-                }                
-            }
-        }
-    }
-
-    public EndPointState deserialize(DataInputStream dis) throws IOException
-    {
-        HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
-        EndPointState epState = new EndPointState(hbState);               
-
-        int appStateSize = dis.readInt();
-        for ( int i = 0; i < appStateSize; ++i )
-        {
-            if ( dis.available() == 0 )
-            {
-                break;
-            }
-            
-            String key = dis.readUTF();    
-            ApplicationState appState = ApplicationState.serializer().deserialize(dis);            
-            epState.addApplicationState(key, appState);            
-        }
-        return epState;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This abstraction represents both the HeartBeatState and the ApplicationState in an EndPointState
+ * instance. Any state for a given endpoint can be retrieved from this instance.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class EndPointState
+{
+    private static ICompactSerializer<EndPointState> serializer_;
+    static
+    {
+        serializer_ = new EndPointStateSerializer();
+    }
+    
+    HeartBeatState hbState_;
+    Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>();
+    
+    /* fields below do not get serialized */
+    long updateTimestamp_;
+    boolean isAlive_;
+    boolean isAGossiper_;
+
+    public static ICompactSerializer<EndPointState> serializer()
+    {
+        return serializer_;
+    }
+    
+    EndPointState(HeartBeatState hbState) 
+    { 
+        hbState_ = hbState; 
+        updateTimestamp_ = System.currentTimeMillis(); 
+        isAlive_ = true; 
+        isAGossiper_ = false;
+    }
+        
+    HeartBeatState getHeartBeatState()
+    {
+        return hbState_;
+    }
+    
+    synchronized void setHeartBeatState(HeartBeatState hbState)
+    {
+        updateTimestamp();
+        hbState_ = hbState;
+    }
+    
+    public ApplicationState getApplicationState(String key)
+    {
+        return applicationState_.get(key);
+    }
+    
+    public Map<String, ApplicationState> getApplicationState()
+    {
+        return applicationState_;
+    }
+    
+    void addApplicationState(String key, ApplicationState appState)
+    {        
+        applicationState_.put(key, appState);        
+    }
+
+    /* getters and setters */
+    long getUpdateTimestamp()
+    {
+        return updateTimestamp_;
+    }
+    
+    synchronized void updateTimestamp()
+    {
+        updateTimestamp_ = System.currentTimeMillis();
+    }
+    
+    public boolean isAlive()
+    {        
+        return isAlive_;
+    }
+
+    synchronized void isAlive(boolean value)
+    {        
+        isAlive_ = value;        
+    }
+
+    
+    boolean isAGossiper()
+    {        
+        return isAGossiper_;
+    }
+
+    synchronized void isAGossiper(boolean value)
+    {                
+        //isAlive_ = false;
+        isAGossiper_ = value;        
+    }
+}
+
+class EndPointStateSerializer implements ICompactSerializer<EndPointState>
+{
+    private static Logger logger_ = Logger.getLogger(EndPointStateSerializer.class);
+    
+    public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
+    {
+        /* These are for estimating whether we overshoot the MTU limit */
+        int estimate = 0;
+
+        /* serialize the HeartBeatState */
+        HeartBeatState hbState = epState.getHeartBeatState();
+        HeartBeatState.serializer().serialize(hbState, dos);
+
+        /* serialize the map of ApplicationState objects */
+        int size = epState.applicationState_.size();
+        dos.writeInt(size);
+        if ( size > 0 )
+        {   
+            Set<String> keys = epState.applicationState_.keySet();
+            for( String key : keys )
+            {
+                if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+                {
+                    logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
+                    break;
+                }
+            
+                ApplicationState appState = epState.applicationState_.get(key);
+                if ( appState != null )
+                {
+                    int pre = dos.size();
+                    dos.writeUTF(key);
+                    ApplicationState.serializer().serialize(appState, dos);                    
+                    int post = dos.size();
+                    estimate = post - pre;
+                }                
+            }
+        }
+    }
+
+    public EndPointState deserialize(DataInputStream dis) throws IOException
+    {
+        HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
+        EndPointState epState = new EndPointState(hbState);               
+
+        int appStateSize = dis.readInt();
+        for ( int i = 0; i < appStateSize; ++i )
+        {
+            if ( dis.available() == 0 )
+            {
+                break;
+            }
+            
+            String key = dis.readUTF();    
+            ApplicationState appState = ApplicationState.serializer().deserialize(dis);            
+            epState.addApplicationState(key, appState);            
+        }
+        return epState;
+    }
+}