You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [12/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Thu Jul 30 15:30:21 2009
@@ -1,292 +1,292 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.LogUtil;
-
-
-/**
- * This class performs the exact opposite of the
- * operations of the BootStrapper class. Given
- * a bunch of nodes that need to move it determines
- * who they need to hand off data in terms of ranges.
-*/
-public class LeaveJoinProtocolImpl implements Runnable
-{
- private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolImpl.class);
-
- /* endpoints that are to be moved. */
- protected EndPoint[] targets_ = new EndPoint[0];
- /* position where they need to be moved */
- protected final Token[] tokens_;
- /* token metadata information */
- protected TokenMetadata tokenMetadata_ = null;
-
- public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
- {
- targets_ = targets;
- tokens_ = tokens;
- tokenMetadata_ = StorageService.instance().getTokenMetadata();
- }
-
- public void run()
- {
- try
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Beginning leave/join process for ...");
- /* copy the token to endpoint map */
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- /* copy the endpoint to token map */
- Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
-
- Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
- Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
- if (logger_.isDebugEnabled())
- logger_.debug("Total number of old ranges " + oldRanges.length);
- /* Calculate the list of nodes that handle the old ranges */
- Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
-
- /* Remove the tokens of the nodes leaving the ring */
- Set<Token> tokens = getTokensForLeavingNodes();
- oldTokens.removeAll(tokens);
- Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
- /* Get expanded range to initial range mapping */
- Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
- /* add the new token positions to the old tokens set */
- for (Token token : tokens_)
- oldTokens.add(token);
- Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
- /* replace the ranges that were split with the split ranges in the old configuration */
- addSplitRangesToOldConfiguration(oldRangeToEndPointMap, rangesAfterNodesJoin);
-
- /* Re-calculate the new ranges after the new token positions are added */
- Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
- /* Remove the old locations from tokenToEndPointMap and add the new locations they are moving to */
- for ( int i = 0; i < targets_.length; ++i )
- {
- tokenToEndPointMap.remove( endpointToTokenMap.get(targets_[i]) );
- tokenToEndPointMap.put(tokens_[i], targets_[i]);
- }
- /* Calculate the list of nodes that handle the new ranges */
- Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
- /* Remove any expanded ranges and replace them with ranges whose aggregate is the expanded range in the new configuration. */
- removeExpandedRangesFromNewConfiguration(newRangeToEndPointMap, expandedRangeToOldRangeMap);
- /* Calculate ranges that need to be sent and from whom to where */
- Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
- /* For debug purposes only */
- Set<Range> ranges = rangesWithSourceTarget.keySet();
- for ( Range range : ranges )
- {
- System.out.print("RANGE: " + range + ":: ");
- List<BootstrapSourceTarget> infos = rangesWithSourceTarget.get(range);
- for ( BootstrapSourceTarget info : infos )
- {
- System.out.print(info);
- System.out.print(" ");
- }
- System.out.println(System.getProperty("line.separator"));
- }
- /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
- LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
- }
- catch ( Throwable th )
- {
- logger_.warn(LogUtil.throwableToString(th));
- }
- }
-
- /**
- * This method figures out the ranges that have been split and
- * replaces them with the split range.
- * @param oldRangeToEndPointMap old range mapped to their replicas.
- * @param rangesAfterNodesJoin ranges after the nodes have joined at
- * their respective position.
- */
- private void addSplitRangesToOldConfiguration(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
- {
- /*
- * Find the ranges that are split. Maintain a mapping between
- * the range being split and the list of subranges.
- */
- Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRangeToEndPointMap.keySet().toArray( new Range[0] ), tokens_);
- /* Mapping of split ranges to the list of endpoints responsible for the range */
- Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();
- Set<Range> rangesSplit = splitRanges.keySet();
- for ( Range splitRange : rangesSplit )
- {
- replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
- }
- /* Remove the ranges that are split. */
- for ( Range splitRange : rangesSplit )
- {
- oldRangeToEndPointMap.remove(splitRange);
- }
-
- /* Add the subranges of the split range to the map with the same replica set. */
- for ( Range splitRange : rangesSplit )
- {
- List<Range> subRanges = splitRanges.get(splitRange);
- List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
- for ( Range subRange : subRanges )
- {
- /* Make sure we clone or else we are hammered. */
- oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
- }
- }
- }
-
- /**
- * Reset the newRangeToEndPointMap and replace the expanded range
- * with the ranges whose aggregate is the expanded range. This happens
- * only when nodes leave the ring to migrate to a different position.
- *
- * @param newRangeToEndPointMap all new ranges mapped to the replicas
- * responsible for those ranges.
- * @param expandedRangeToOldRangeMap mapping between the expanded ranges
- * and the ranges whose aggregate is the expanded range.
- */
- private void removeExpandedRangesFromNewConfiguration(Map<Range, List<EndPoint>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
- {
- /* Get the replicas for the expanded ranges */
- Map<Range, List<EndPoint>> replicasForExpandedRanges = new HashMap<Range, List<EndPoint>>();
- Set<Range> expandedRanges = expandedRangeToOldRangeMap.keySet();
- for ( Range expandedRange : expandedRanges )
- {
- replicasForExpandedRanges.put( expandedRange, newRangeToEndPointMap.get(expandedRange) );
- newRangeToEndPointMap.remove(expandedRange);
- }
- /* replace the expanded ranges in the newRangeToEndPointMap with the subRanges */
- for ( Range expandedRange : expandedRanges )
- {
- List<Range> subRanges = expandedRangeToOldRangeMap.get(expandedRange);
- List<EndPoint> replicas = replicasForExpandedRanges.get(expandedRange);
- for ( Range subRange : subRanges )
- {
- newRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
- }
- }
- }
-
- private Set<Token> getTokensForLeavingNodes()
- {
- Set<Token> tokens = new HashSet<Token>();
- for ( EndPoint target : targets_ )
- {
- tokens.add(tokenMetadata_.getToken(target));
- }
- return tokens;
- }
-
- /**
- * Here we are removing the nodes that need to leave the
- * ring and trying to calculate what the ranges would look
- * like w/o them. e.g. if we remove two nodes A and D from
- * the ring and the order of nodes on the ring is A, B, C
- * and D. When B is removed the range of C is the old range
- * of C and the old range of B. We want a mapping from old
- * range of B to new range of B. We have
- * A----B----C----D----E----F----G and we remove b and e
- * then we want a mapping from (a, c] --> (a,b], (b, c] and
- * (d, f] --> (d, e], (d,f].
- * @param oldRanges ranges with the previous configuration
- * @param newRanges ranges with the target endpoints removed.
- * @return map of expanded range to the list whose aggregate is
- * the expanded range.
- */
- protected static Map<Range, List<Range>> getExpandedRangeToOldRangeMapping(Range[] oldRanges, Range[] newRanges)
- {
- Map<Range, List<Range>> map = new HashMap<Range, List<Range>>();
- List<Range> oRanges = new ArrayList<Range>();
- Collections.addAll(oRanges, oldRanges);
- List<Range> nRanges = new ArrayList<Range>();
- Collections.addAll(nRanges, newRanges);
-
- /*
- * Remove the ranges that are the same.
- * Now we will be left with the expanded
- * ranges in the nRanges list and the
- * smaller ranges in the oRanges list.
- */
- for( Range oRange : oldRanges )
- {
- boolean bVal = nRanges.remove(oRange);
- if ( bVal )
- oRanges.remove(oRange);
- }
-
- int nSize = nRanges.size();
- int oSize = oRanges.size();
- /*
- * Establish the mapping between expanded ranges
- * to the smaller ranges whose aggregate is the
- * expanded range.
- */
- for ( int i = 0; i < nSize; ++i )
- {
- Range nRange = nRanges.get(i);
- for ( int j = 0; j < oSize; ++j )
- {
- Range oRange = oRanges.get(j);
- if ( nRange.contains(oRange.right()) )
- {
- List<Range> smallerRanges = map.get(nRange);
- if ( smallerRanges == null )
- {
- smallerRanges = new ArrayList<Range>();
- map.put(nRange, smallerRanges);
- }
- smallerRanges.add(oRange);
- continue;
- }
- }
- }
-
- return map;
- }
-
- public static void main(String[] args) throws Throwable
- {
- StorageService ss = StorageService.instance();
- ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
-
- Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
- runnable.run();
- }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.LogUtil;
+
+
+/**
+ * This class performs the exact opposite of the
+ * operations of the BootStrapper class. Given
+ * a bunch of nodes that need to move it determines
+ * who they need to hand off data in terms of ranges.
+*/
+public class LeaveJoinProtocolImpl implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolImpl.class);
+
+ /* endpoints that are to be moved. */
+ protected EndPoint[] targets_ = new EndPoint[0];
+ /* position where they need to be moved */
+ protected final Token[] tokens_;
+ /* token metadata information */
+ protected TokenMetadata tokenMetadata_ = null;
+
+ public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
+ {
+ targets_ = targets;
+ tokens_ = tokens;
+ tokenMetadata_ = StorageService.instance().getTokenMetadata();
+ }
+
+ public void run()
+ {
+ try
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Beginning leave/join process for ...");
+ /* copy the token to endpoint map */
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ /* copy the endpoint to token map */
+ Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+
+ Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
+ Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Total number of old ranges " + oldRanges.length);
+ /* Calculate the list of nodes that handle the old ranges */
+ Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
+
+ /* Remove the tokens of the nodes leaving the ring */
+ Set<Token> tokens = getTokensForLeavingNodes();
+ oldTokens.removeAll(tokens);
+ Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
+ /* Get expanded range to initial range mapping */
+ Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
+ /* add the new token positions to the old tokens set */
+ for (Token token : tokens_)
+ oldTokens.add(token);
+ Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
+ /* replace the ranges that were split with the split ranges in the old configuration */
+ addSplitRangesToOldConfiguration(oldRangeToEndPointMap, rangesAfterNodesJoin);
+
+ /* Re-calculate the new ranges after the new token positions are added */
+ Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
+ /* Remove the old locations from tokenToEndPointMap and add the new locations they are moving to */
+ for ( int i = 0; i < targets_.length; ++i )
+ {
+ tokenToEndPointMap.remove( endpointToTokenMap.get(targets_[i]) );
+ tokenToEndPointMap.put(tokens_[i], targets_[i]);
+ }
+ /* Calculate the list of nodes that handle the new ranges */
+ Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
+ /* Remove any expanded ranges and replace them with ranges whose aggregate is the expanded range in the new configuration. */
+ removeExpandedRangesFromNewConfiguration(newRangeToEndPointMap, expandedRangeToOldRangeMap);
+ /* Calculate ranges that need to be sent and from whom to where */
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
+ /* For debug purposes only */
+ Set<Range> ranges = rangesWithSourceTarget.keySet();
+ for ( Range range : ranges )
+ {
+ System.out.print("RANGE: " + range + ":: ");
+ List<BootstrapSourceTarget> infos = rangesWithSourceTarget.get(range);
+ for ( BootstrapSourceTarget info : infos )
+ {
+ System.out.print(info);
+ System.out.print(" ");
+ }
+ System.out.println(System.getProperty("line.separator"));
+ }
+ /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
+ LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
+ }
+ catch ( Throwable th )
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ /**
+ * This method figures out the ranges that have been split and
+ * replaces them with the split range.
+ * @param oldRangeToEndPointMap old range mapped to their replicas.
+ * @param rangesAfterNodesJoin ranges after the nodes have joined at
+ * their respective position.
+ */
+ private void addSplitRangesToOldConfiguration(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
+ {
+ /*
+ * Find the ranges that are split. Maintain a mapping between
+ * the range being split and the list of subranges.
+ */
+ Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRangeToEndPointMap.keySet().toArray( new Range[0] ), tokens_);
+ /* Mapping of split ranges to the list of endpoints responsible for the range */
+ Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();
+ Set<Range> rangesSplit = splitRanges.keySet();
+ for ( Range splitRange : rangesSplit )
+ {
+ replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
+ }
+ /* Remove the ranges that are split. */
+ for ( Range splitRange : rangesSplit )
+ {
+ oldRangeToEndPointMap.remove(splitRange);
+ }
+
+ /* Add the subranges of the split range to the map with the same replica set. */
+ for ( Range splitRange : rangesSplit )
+ {
+ List<Range> subRanges = splitRanges.get(splitRange);
+ List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+ for ( Range subRange : subRanges )
+ {
+ /* Make sure we clone or else we are hammered. */
+ oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ }
+ }
+ }
+
+ /**
+ * Reset the newRangeToEndPointMap and replace the expanded range
+ * with the ranges whose aggregate is the expanded range. This happens
+ * only when nodes leave the ring to migrate to a different position.
+ *
+ * @param newRangeToEndPointMap all new ranges mapped to the replicas
+ * responsible for those ranges.
+ * @param expandedRangeToOldRangeMap mapping between the expanded ranges
+ * and the ranges whose aggregate is the expanded range.
+ */
+ private void removeExpandedRangesFromNewConfiguration(Map<Range, List<EndPoint>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
+ {
+ /* Get the replicas for the expanded ranges */
+ Map<Range, List<EndPoint>> replicasForExpandedRanges = new HashMap<Range, List<EndPoint>>();
+ Set<Range> expandedRanges = expandedRangeToOldRangeMap.keySet();
+ for ( Range expandedRange : expandedRanges )
+ {
+ replicasForExpandedRanges.put( expandedRange, newRangeToEndPointMap.get(expandedRange) );
+ newRangeToEndPointMap.remove(expandedRange);
+ }
+ /* replace the expanded ranges in the newRangeToEndPointMap with the subRanges */
+ for ( Range expandedRange : expandedRanges )
+ {
+ List<Range> subRanges = expandedRangeToOldRangeMap.get(expandedRange);
+ List<EndPoint> replicas = replicasForExpandedRanges.get(expandedRange);
+ for ( Range subRange : subRanges )
+ {
+ newRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ }
+ }
+ }
+
+ private Set<Token> getTokensForLeavingNodes()
+ {
+ Set<Token> tokens = new HashSet<Token>();
+ for ( EndPoint target : targets_ )
+ {
+ tokens.add(tokenMetadata_.getToken(target));
+ }
+ return tokens;
+ }
+
+ /**
+ * Here we are removing the nodes that need to leave the
+ * ring and trying to calculate what the ranges would look
+ * like w/o them. e.g. if we remove two nodes A and D from
+ * the ring and the order of nodes on the ring is A, B, C
+ * and D. When B is removed the range of C is the old range
+ * of C and the old range of B. We want a mapping from old
+ * range of B to new range of B. We have
+ * A----B----C----D----E----F----G and we remove b and e
+ * then we want a mapping from (a, c] --> (a,b], (b, c] and
+ * (d, f] --> (d, e], (d,f].
+ * @param oldRanges ranges with the previous configuration
+ * @param newRanges ranges with the target endpoints removed.
+ * @return map of expanded range to the list whose aggregate is
+ * the expanded range.
+ */
+ protected static Map<Range, List<Range>> getExpandedRangeToOldRangeMapping(Range[] oldRanges, Range[] newRanges)
+ {
+ Map<Range, List<Range>> map = new HashMap<Range, List<Range>>();
+ List<Range> oRanges = new ArrayList<Range>();
+ Collections.addAll(oRanges, oldRanges);
+ List<Range> nRanges = new ArrayList<Range>();
+ Collections.addAll(nRanges, newRanges);
+
+ /*
+ * Remove the ranges that are the same.
+ * Now we will be left with the expanded
+ * ranges in the nRanges list and the
+ * smaller ranges in the oRanges list.
+ */
+ for( Range oRange : oldRanges )
+ {
+ boolean bVal = nRanges.remove(oRange);
+ if ( bVal )
+ oRanges.remove(oRange);
+ }
+
+ int nSize = nRanges.size();
+ int oSize = oRanges.size();
+ /*
+ * Establish the mapping between expanded ranges
+ * to the smaller ranges whose aggregate is the
+ * expanded range.
+ */
+ for ( int i = 0; i < nSize; ++i )
+ {
+ Range nRange = nRanges.get(i);
+ for ( int j = 0; j < oSize; ++j )
+ {
+ Range oRange = oRanges.get(j);
+ if ( nRange.contains(oRange.right()) )
+ {
+ List<Range> smallerRanges = map.get(nRange);
+ if ( smallerRanges == null )
+ {
+ smallerRanges = new ArrayList<Range>();
+ map.put(nRange, smallerRanges);
+ }
+ smallerRanges.add(oRange);
+ continue;
+ }
+ }
+ }
+
+ return map;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ StorageService ss = StorageService.instance();
+ ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
+
+ Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
+ runnable.run();
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java Thu Jul 30 15:30:21 2009
@@ -1,129 +1,129 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.UnsupportedEncodingException;
-import java.text.Collator;
-import java.util.Comparator;
-import java.util.Locale;
-import java.util.Random;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-public class OrderPreservingPartitioner implements IPartitioner
-{
- // TODO make locale configurable. But don't just leave it up to the OS or you could really screw
- // people over if they deploy on nodes with different OS locales.
- static final Collator collator = Collator.getInstance(new Locale("en", "US"));
-
- private static final Comparator<String> comparator = new Comparator<String>() {
- public int compare(String o1, String o2)
- {
- return collator.compare(o1, o2);
- }
- };
- private static final Comparator<String> reverseComparator = new Comparator<String>() {
- public int compare(String o1, String o2)
- {
- return -comparator.compare(o1, o2);
- }
- };
-
- public String decorateKey(String key)
- {
- return key;
- }
-
- public String undecorateKey(String decoratedKey)
- {
- return decoratedKey;
- }
-
- public Comparator<String> getDecoratedKeyComparator()
- {
- return comparator;
- }
-
- public Comparator<String> getReverseDecoratedKeyComparator()
- {
- return reverseComparator;
- }
-
- public StringToken getDefaultToken()
- {
- String initialToken = DatabaseDescriptor.getInitialToken();
- if (initialToken != null)
- return new StringToken(initialToken);
-
- // generate random token
- String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
- Random r = new Random();
- StringBuilder buffer = new StringBuilder();
- for (int j = 0; j < 16; j++) {
- buffer.append(chars.charAt(r.nextInt(chars.length())));
- }
- return new StringToken(buffer.toString());
- }
-
- private final Token.TokenFactory<String> tokenFactory = new Token.TokenFactory<String>() {
- public byte[] toByteArray(Token<String> stringToken)
- {
- try
- {
- return stringToken.token.getBytes("UTF-8");
- }
- catch (UnsupportedEncodingException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public Token<String> fromByteArray(byte[] bytes)
- {
- try
- {
- return new StringToken(new String(bytes, "UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public String toString(Token<String> stringToken)
- {
- return stringToken.token;
- }
-
- public Token<String> fromString(String string)
- {
- return new StringToken(string);
- }
- };
-
- public Token.TokenFactory<String> getTokenFactory()
- {
- return tokenFactory;
- }
-
- public Token getInitialToken(String key)
- {
- return new StringToken(key);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.UnsupportedEncodingException;
+import java.text.Collator;
+import java.util.Comparator;
+import java.util.Locale;
+import java.util.Random;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class OrderPreservingPartitioner implements IPartitioner
+{
+ // TODO make locale configurable. But don't just leave it up to the OS or you could really screw
+ // people over if they deploy on nodes with different OS locales.
+ static final Collator collator = Collator.getInstance(new Locale("en", "US"));
+
+ private static final Comparator<String> comparator = new Comparator<String>() {
+ public int compare(String o1, String o2)
+ {
+ return collator.compare(o1, o2);
+ }
+ };
+ private static final Comparator<String> reverseComparator = new Comparator<String>() {
+ public int compare(String o1, String o2)
+ {
+ return -comparator.compare(o1, o2);
+ }
+ };
+
+ public String decorateKey(String key)
+ {
+ return key;
+ }
+
+ public String undecorateKey(String decoratedKey)
+ {
+ return decoratedKey;
+ }
+
+ public Comparator<String> getDecoratedKeyComparator()
+ {
+ return comparator;
+ }
+
+ public Comparator<String> getReverseDecoratedKeyComparator()
+ {
+ return reverseComparator;
+ }
+
+ public StringToken getDefaultToken()
+ {
+ String initialToken = DatabaseDescriptor.getInitialToken();
+ if (initialToken != null)
+ return new StringToken(initialToken);
+
+ // generate random token
+ String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+ Random r = new Random();
+ StringBuilder buffer = new StringBuilder();
+ for (int j = 0; j < 16; j++) {
+ buffer.append(chars.charAt(r.nextInt(chars.length())));
+ }
+ return new StringToken(buffer.toString());
+ }
+
+ private final Token.TokenFactory<String> tokenFactory = new Token.TokenFactory<String>() {
+ public byte[] toByteArray(Token<String> stringToken)
+ {
+ try
+ {
+ return stringToken.token.getBytes("UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Token<String> fromByteArray(byte[] bytes)
+ {
+ try
+ {
+ return new StringToken(new String(bytes, "UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String toString(Token<String> stringToken)
+ {
+ return stringToken.token;
+ }
+
+ public Token<String> fromString(String string)
+ {
+ return new StringToken(string);
+ }
+ };
+
+ public Token.TokenFactory<String> getTokenFactory()
+ {
+ return tokenFactory;
+ }
+
+ public Token getInitialToken(String key)
+ {
+ return new StringToken(key);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java Thu Jul 30 15:30:21 2009
@@ -1,121 +1,121 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.math.BigInteger;
-import java.util.Comparator;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.GuidGenerator;
-
-/**
- * This class generates a BigIntegerToken using MD5 hash.
- */
-public class RandomPartitioner implements IPartitioner
-{
- private static final Comparator<String> comparator = new Comparator<String>()
- {
- public int compare(String o1, String o2)
- {
- String[] split1 = o1.split(":", 2);
- String[] split2 = o2.split(":", 2);
- BigInteger i1 = new BigInteger(split1[0]);
- BigInteger i2 = new BigInteger(split2[0]);
- int v = i1.compareTo(i2);
- if (v != 0) {
- return v;
- }
- return split1[1].compareTo(split2[1]);
- }
- };
- private static final Comparator<String> rcomparator = new Comparator<String>()
- {
- public int compare(String o1, String o2)
- {
- return -comparator.compare(o1, o2);
- }
- };
-
- public String decorateKey(String key)
- {
- return FBUtilities.hash(key).toString() + ":" + key;
- }
-
- public String undecorateKey(String decoratedKey)
- {
- return decoratedKey.split(":", 2)[1];
- }
-
- public Comparator<String> getDecoratedKeyComparator()
- {
- return comparator;
- }
-
- public Comparator<String> getReverseDecoratedKeyComparator()
- {
- return rcomparator;
- }
-
- public BigIntegerToken getDefaultToken()
- {
- String initialToken = DatabaseDescriptor.getInitialToken();
- if (initialToken != null)
- return new BigIntegerToken(new BigInteger(initialToken));
-
- // generate random token
- String guid = GuidGenerator.guid();
- BigInteger token = FBUtilities.hash(guid);
- if ( token.signum() == -1 )
- token = token.multiply(BigInteger.valueOf(-1L));
- return new BigIntegerToken(token);
- }
-
- private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
- public byte[] toByteArray(Token<BigInteger> bigIntegerToken)
- {
- return bigIntegerToken.token.toByteArray();
- }
-
- public Token<BigInteger> fromByteArray(byte[] bytes)
- {
- return new BigIntegerToken(new BigInteger(bytes));
- }
-
- public String toString(Token<BigInteger> bigIntegerToken)
- {
- return bigIntegerToken.token.toString();
- }
-
- public Token<BigInteger> fromString(String string)
- {
- return new BigIntegerToken(new BigInteger(string));
- }
- };
-
- public Token.TokenFactory<BigInteger> getTokenFactory()
- {
- return tokenFactory;
- }
-
- public Token getInitialToken(String key)
- {
- return new BigIntegerToken(FBUtilities.hash(key));
- }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.util.Comparator;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+
+/**
+ * This class generates a BigIntegerToken using MD5 hash.
+ */
+public class RandomPartitioner implements IPartitioner
+{
+ private static final Comparator<String> comparator = new Comparator<String>()
+ {
+ public int compare(String o1, String o2)
+ {
+ String[] split1 = o1.split(":", 2);
+ String[] split2 = o2.split(":", 2);
+ BigInteger i1 = new BigInteger(split1[0]);
+ BigInteger i2 = new BigInteger(split2[0]);
+ int v = i1.compareTo(i2);
+ if (v != 0) {
+ return v;
+ }
+ return split1[1].compareTo(split2[1]);
+ }
+ };
+ private static final Comparator<String> rcomparator = new Comparator<String>()
+ {
+ public int compare(String o1, String o2)
+ {
+ return -comparator.compare(o1, o2);
+ }
+ };
+
+ public String decorateKey(String key)
+ {
+ return FBUtilities.hash(key).toString() + ":" + key;
+ }
+
+ public String undecorateKey(String decoratedKey)
+ {
+ return decoratedKey.split(":", 2)[1];
+ }
+
+ public Comparator<String> getDecoratedKeyComparator()
+ {
+ return comparator;
+ }
+
+ public Comparator<String> getReverseDecoratedKeyComparator()
+ {
+ return rcomparator;
+ }
+
+ public BigIntegerToken getDefaultToken()
+ {
+ String initialToken = DatabaseDescriptor.getInitialToken();
+ if (initialToken != null)
+ return new BigIntegerToken(new BigInteger(initialToken));
+
+ // generate random token
+ String guid = GuidGenerator.guid();
+ BigInteger token = FBUtilities.hash(guid);
+ if ( token.signum() == -1 )
+ token = token.multiply(BigInteger.valueOf(-1L));
+ return new BigIntegerToken(token);
+ }
+
+ private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
+ public byte[] toByteArray(Token<BigInteger> bigIntegerToken)
+ {
+ return bigIntegerToken.token.toByteArray();
+ }
+
+ public Token<BigInteger> fromByteArray(byte[] bytes)
+ {
+ return new BigIntegerToken(new BigInteger(bytes));
+ }
+
+ public String toString(Token<BigInteger> bigIntegerToken)
+ {
+ return bigIntegerToken.token.toString();
+ }
+
+ public Token<BigInteger> fromString(String string)
+ {
+ return new BigIntegerToken(new BigInteger(string));
+ }
+ };
+
+ public Token.TokenFactory<BigInteger> getTokenFactory()
+ {
+ return tokenFactory;
+ }
+
+ public Token getInitialToken(String key)
+ {
+ return new BigIntegerToken(FBUtilities.hash(key));
+ }
}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java Thu Jul 30 15:30:21 2009
@@ -1,186 +1,186 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.math.BigInteger;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.service.StorageService;
-
-
-/**
- * A representation of the range that a node is responsible for on the DHT ring.
- *
- * A Range is responsible for the tokens between [left, right).
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class Range implements Comparable<Range>, Serializable
-{
- private static ICompactSerializer<Range> serializer_;
- static
- {
- serializer_ = new RangeSerializer();
- }
-
- public static ICompactSerializer<Range> serializer()
- {
- return serializer_;
- }
-
- private final Token left_;
- private final Token right_;
-
- public Range(Token left, Token right)
- {
- left_ = left;
- right_ = right;
- }
-
- /**
- * Returns the left endpoint of a range.
- * @return left endpoint
- */
- public Token left()
- {
- return left_;
- }
-
- /**
- * Returns the right endpoint of a range.
- * @return right endpoint
- */
- public Token right()
- {
- return right_;
- }
-
- /**
- * Helps determine if a given point on the DHT ring is contained
- * in the range in question.
- * @param bi point in question
- * @return true if the point contains within the range else false.
- */
- public boolean contains(Token bi)
- {
- if ( left_.compareTo(right_) > 0 )
- {
- /*
- * left is greater than right we are wrapping around.
- * So if the interval is [a,b) where a > b then we have
- * 3 cases one of which holds for any given token k.
- * (1) k > a -- return true
- * (2) k < b -- return true
- * (3) b < k < a -- return false
- */
- if ( bi.compareTo(left_) >= 0 )
- return true;
- else return right_.compareTo(bi) > 0;
- }
- else if ( left_.compareTo(right_) < 0 )
- {
- /*
- * This is the range [a, b) where a < b.
- */
- return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) > 0 );
- }
- else
- {
- return true;
- }
- }
-
- /**
- * Tells if the given range is a wrap around.
- * @param range
- * @return
- */
- private static boolean isWrapAround(Range range)
- {
- return range.left_.compareTo(range.right_) > 0;
- }
-
- public int compareTo(Range rhs)
- {
- /*
- * If the range represented by the "this" pointer
- * is a wrap around then it is the smaller one.
- */
- if ( isWrapAround(this) )
- return -1;
-
- if ( isWrapAround(rhs) )
- return 1;
-
- return right_.compareTo(rhs.right_);
- }
-
-
- public static boolean isTokenInRanges(Token token, List<Range> ranges)
- {
- assert ranges != null;
-
- for (Range range : ranges)
- {
- if(range.contains(token))
- {
- return true;
- }
- }
- return false;
- }
-
- public boolean equals(Object o)
- {
- if ( !(o instanceof Range) )
- return false;
- Range rhs = (Range)o;
- return left_.equals(rhs.left_) && right_.equals(rhs.right_);
- }
-
- public int hashCode()
- {
- return toString().hashCode();
- }
-
- public String toString()
- {
- return "(" + left_ + "," + right_ + "]";
- }
-}
-
-class RangeSerializer implements ICompactSerializer<Range>
-{
- public void serialize(Range range, DataOutputStream dos) throws IOException
- {
- Token.serializer().serialize(range.left(), dos);
- Token.serializer().serialize(range.right(), dos);
- }
-
- public Range deserialize(DataInputStream dis) throws IOException
- {
- return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.math.BigInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * A representation of the range that a node is responsible for on the DHT ring.
+ *
+ * A Range is responsible for the tokens between [left, right).
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Range implements Comparable<Range>, Serializable
+{
+ private static ICompactSerializer<Range> serializer_;
+ static
+ {
+ serializer_ = new RangeSerializer();
+ }
+
+ public static ICompactSerializer<Range> serializer()
+ {
+ return serializer_;
+ }
+
+ private final Token left_;
+ private final Token right_;
+
+ public Range(Token left, Token right)
+ {
+ left_ = left;
+ right_ = right;
+ }
+
+ /**
+ * Returns the left endpoint of a range.
+ * @return left endpoint
+ */
+ public Token left()
+ {
+ return left_;
+ }
+
+ /**
+ * Returns the right endpoint of a range.
+ * @return right endpoint
+ */
+ public Token right()
+ {
+ return right_;
+ }
+
+ /**
+ * Helps determine if a given point on the DHT ring is contained
+ * in the range in question.
+ * @param bi point in question
+ * @return true if the point contains within the range else false.
+ */
+ public boolean contains(Token bi)
+ {
+ if ( left_.compareTo(right_) > 0 )
+ {
+ /*
+ * left is greater than right we are wrapping around.
+ * So if the interval is [a,b) where a > b then we have
+ * 3 cases one of which holds for any given token k.
+ * (1) k > a -- return true
+ * (2) k < b -- return true
+ * (3) b < k < a -- return false
+ */
+ if ( bi.compareTo(left_) >= 0 )
+ return true;
+ else return right_.compareTo(bi) > 0;
+ }
+ else if ( left_.compareTo(right_) < 0 )
+ {
+ /*
+ * This is the range [a, b) where a < b.
+ */
+ return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) > 0 );
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Tells if the given range is a wrap around.
+ * @param range
+ * @return
+ */
+ private static boolean isWrapAround(Range range)
+ {
+ return range.left_.compareTo(range.right_) > 0;
+ }
+
+ public int compareTo(Range rhs)
+ {
+ /*
+ * If the range represented by the "this" pointer
+ * is a wrap around then it is the smaller one.
+ */
+ if ( isWrapAround(this) )
+ return -1;
+
+ if ( isWrapAround(rhs) )
+ return 1;
+
+ return right_.compareTo(rhs.right_);
+ }
+
+
+ public static boolean isTokenInRanges(Token token, List<Range> ranges)
+ {
+ assert ranges != null;
+
+ for (Range range : ranges)
+ {
+ if(range.contains(token))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof Range) )
+ return false;
+ Range rhs = (Range)o;
+ return left_.equals(rhs.left_) && right_.equals(rhs.right_);
+ }
+
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return "(" + left_ + "," + right_ + "]";
+ }
+}
+
+class RangeSerializer implements ICompactSerializer<Range>
+{
+ public void serialize(Range range, DataOutputStream dos) throws IOException
+ {
+ Token.serializer().serialize(range.left(), dos);
+ Token.serializer().serialize(range.right(), dos);
+ }
+
+ public Range deserialize(DataInputStream dis) throws IOException
+ {
+ return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Thu Jul 30 15:30:21 2009
@@ -1,101 +1,101 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-
-
-/**
- * This abstraction represents the state associated with a particular node which an
- * application wants to make available to the rest of the nodes in the cluster.
- * Whenever a piece of state needs to be disseminated to the rest of cluster wrap
- * the state in an instance of <i>ApplicationState</i> and add it to the Gossiper.
- *
- * e.g. if we want to disseminate load information for node A do the following:
- *
- * ApplicationState loadState = new ApplicationState(<string representation of load>);
- * Gossiper.instance().addApplicationState("LOAD STATE", loadState);
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ApplicationState
-{
- private static ICompactSerializer<ApplicationState> serializer_;
- static
- {
- serializer_ = new ApplicationStateSerializer();
- }
-
- int version_;
- String state_;
-
-
- ApplicationState(String state, int version)
- {
- state_ = state;
- version_ = version;
- }
-
- public static ICompactSerializer<ApplicationState> serializer()
- {
- return serializer_;
- }
-
- /**
- * Wraps the specified state into a ApplicationState instance.
- * @param state string representation of arbitrary state.
- */
- public ApplicationState(String state)
- {
- state_ = state;
- version_ = VersionGenerator.getNextVersion();
- }
-
- public String getState()
- {
- return state_;
- }
-
- int getStateVersion()
- {
- return version_;
- }
-}
-
-class ApplicationStateSerializer implements ICompactSerializer<ApplicationState>
-{
- public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(appState.state_);
- dos.writeInt(appState.version_);
- }
-
- public ApplicationState deserialize(DataInputStream dis) throws IOException
- {
- String state = dis.readUTF();
- int version = dis.readInt();
- return new ApplicationState(state, version);
- }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * This abstraction represents the state associated with a particular node which an
+ * application wants to make available to the rest of the nodes in the cluster.
+ * Whenever a piece of state needs to be disseminated to the rest of cluster wrap
+ * the state in an instance of <i>ApplicationState</i> and add it to the Gossiper.
+ *
+ * e.g. if we want to disseminate load information for node A do the following:
+ *
+ * ApplicationState loadState = new ApplicationState(<string representation of load>);
+ * Gossiper.instance().addApplicationState("LOAD STATE", loadState);
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ApplicationState
+{
+ private static ICompactSerializer<ApplicationState> serializer_;
+ static
+ {
+ serializer_ = new ApplicationStateSerializer();
+ }
+
+ int version_;
+ String state_;
+
+
+ ApplicationState(String state, int version)
+ {
+ state_ = state;
+ version_ = version;
+ }
+
+ public static ICompactSerializer<ApplicationState> serializer()
+ {
+ return serializer_;
+ }
+
+ /**
+ * Wraps the specified state into a ApplicationState instance.
+ * @param state string representation of arbitrary state.
+ */
+ public ApplicationState(String state)
+ {
+ state_ = state;
+ version_ = VersionGenerator.getNextVersion();
+ }
+
+ public String getState()
+ {
+ return state_;
+ }
+
+ int getStateVersion()
+ {
+ return version_;
+ }
+}
+
+class ApplicationStateSerializer implements ICompactSerializer<ApplicationState>
+{
+ public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(appState.state_);
+ dos.writeInt(appState.version_);
+ }
+
+ public ApplicationState deserialize(DataInputStream dis) throws IOException
+ {
+ String state = dis.readUTF();
+ int version = dis.readInt();
+ return new ApplicationState(state, version);
+ }
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java Thu Jul 30 15:30:21 2009
@@ -1,184 +1,184 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-import org.apache.cassandra.io.ICompactSerializer;
-
-import org.apache.log4j.Logger;
-
-/**
- * This abstraction represents both the HeartBeatState and the ApplicationState in an EndPointState
- * instance. Any state for a given endpoint can be retrieved from this instance.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class EndPointState
-{
- private static ICompactSerializer<EndPointState> serializer_;
- static
- {
- serializer_ = new EndPointStateSerializer();
- }
-
- HeartBeatState hbState_;
- Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>();
-
- /* fields below do not get serialized */
- long updateTimestamp_;
- boolean isAlive_;
- boolean isAGossiper_;
-
- public static ICompactSerializer<EndPointState> serializer()
- {
- return serializer_;
- }
-
- EndPointState(HeartBeatState hbState)
- {
- hbState_ = hbState;
- updateTimestamp_ = System.currentTimeMillis();
- isAlive_ = true;
- isAGossiper_ = false;
- }
-
- HeartBeatState getHeartBeatState()
- {
- return hbState_;
- }
-
- synchronized void setHeartBeatState(HeartBeatState hbState)
- {
- updateTimestamp();
- hbState_ = hbState;
- }
-
- public ApplicationState getApplicationState(String key)
- {
- return applicationState_.get(key);
- }
-
- public Map<String, ApplicationState> getApplicationState()
- {
- return applicationState_;
- }
-
- void addApplicationState(String key, ApplicationState appState)
- {
- applicationState_.put(key, appState);
- }
-
- /* getters and setters */
- long getUpdateTimestamp()
- {
- return updateTimestamp_;
- }
-
- synchronized void updateTimestamp()
- {
- updateTimestamp_ = System.currentTimeMillis();
- }
-
- public boolean isAlive()
- {
- return isAlive_;
- }
-
- synchronized void isAlive(boolean value)
- {
- isAlive_ = value;
- }
-
-
- boolean isAGossiper()
- {
- return isAGossiper_;
- }
-
- synchronized void isAGossiper(boolean value)
- {
- //isAlive_ = false;
- isAGossiper_ = value;
- }
-}
-
-class EndPointStateSerializer implements ICompactSerializer<EndPointState>
-{
- private static Logger logger_ = Logger.getLogger(EndPointStateSerializer.class);
-
- public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
- {
- /* These are for estimating whether we overshoot the MTU limit */
- int estimate = 0;
-
- /* serialize the HeartBeatState */
- HeartBeatState hbState = epState.getHeartBeatState();
- HeartBeatState.serializer().serialize(hbState, dos);
-
- /* serialize the map of ApplicationState objects */
- int size = epState.applicationState_.size();
- dos.writeInt(size);
- if ( size > 0 )
- {
- Set<String> keys = epState.applicationState_.keySet();
- for( String key : keys )
- {
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
- break;
- }
-
- ApplicationState appState = epState.applicationState_.get(key);
- if ( appState != null )
- {
- int pre = dos.size();
- dos.writeUTF(key);
- ApplicationState.serializer().serialize(appState, dos);
- int post = dos.size();
- estimate = post - pre;
- }
- }
- }
- }
-
- public EndPointState deserialize(DataInputStream dis) throws IOException
- {
- HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
- EndPointState epState = new EndPointState(hbState);
-
- int appStateSize = dis.readInt();
- for ( int i = 0; i < appStateSize; ++i )
- {
- if ( dis.available() == 0 )
- {
- break;
- }
-
- String key = dis.readUTF();
- ApplicationState appState = ApplicationState.serializer().deserialize(dis);
- epState.addApplicationState(key, appState);
- }
- return epState;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This abstraction represents both the HeartBeatState and the ApplicationState in an EndPointState
+ * instance. Any state for a given endpoint can be retrieved from this instance.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class EndPointState
+{
+ private static ICompactSerializer<EndPointState> serializer_;
+ static
+ {
+ serializer_ = new EndPointStateSerializer();
+ }
+
+ HeartBeatState hbState_;
+ Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>();
+
+ /* fields below do not get serialized */
+ long updateTimestamp_;
+ boolean isAlive_;
+ boolean isAGossiper_;
+
+ public static ICompactSerializer<EndPointState> serializer()
+ {
+ return serializer_;
+ }
+
+ EndPointState(HeartBeatState hbState)
+ {
+ hbState_ = hbState;
+ updateTimestamp_ = System.currentTimeMillis();
+ isAlive_ = true;
+ isAGossiper_ = false;
+ }
+
+ HeartBeatState getHeartBeatState()
+ {
+ return hbState_;
+ }
+
+ synchronized void setHeartBeatState(HeartBeatState hbState)
+ {
+ updateTimestamp();
+ hbState_ = hbState;
+ }
+
+ public ApplicationState getApplicationState(String key)
+ {
+ return applicationState_.get(key);
+ }
+
+ public Map<String, ApplicationState> getApplicationState()
+ {
+ return applicationState_;
+ }
+
+ void addApplicationState(String key, ApplicationState appState)
+ {
+ applicationState_.put(key, appState);
+ }
+
+ /* getters and setters */
+ long getUpdateTimestamp()
+ {
+ return updateTimestamp_;
+ }
+
+ synchronized void updateTimestamp()
+ {
+ updateTimestamp_ = System.currentTimeMillis();
+ }
+
+ public boolean isAlive()
+ {
+ return isAlive_;
+ }
+
+ synchronized void isAlive(boolean value)
+ {
+ isAlive_ = value;
+ }
+
+
+ boolean isAGossiper()
+ {
+ return isAGossiper_;
+ }
+
+ synchronized void isAGossiper(boolean value)
+ {
+ //isAlive_ = false;
+ isAGossiper_ = value;
+ }
+}
+
+class EndPointStateSerializer implements ICompactSerializer<EndPointState>
+{
+ private static Logger logger_ = Logger.getLogger(EndPointStateSerializer.class);
+
+ public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
+ {
+ /* These are for estimating whether we overshoot the MTU limit */
+ int estimate = 0;
+
+ /* serialize the HeartBeatState */
+ HeartBeatState hbState = epState.getHeartBeatState();
+ HeartBeatState.serializer().serialize(hbState, dos);
+
+ /* serialize the map of ApplicationState objects */
+ int size = epState.applicationState_.size();
+ dos.writeInt(size);
+ if ( size > 0 )
+ {
+ Set<String> keys = epState.applicationState_.keySet();
+ for( String key : keys )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
+ break;
+ }
+
+ ApplicationState appState = epState.applicationState_.get(key);
+ if ( appState != null )
+ {
+ int pre = dos.size();
+ dos.writeUTF(key);
+ ApplicationState.serializer().serialize(appState, dos);
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ }
+ }
+ }
+
+ public EndPointState deserialize(DataInputStream dis) throws IOException
+ {
+ HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
+ EndPointState epState = new EndPointState(hbState);
+
+ int appStateSize = dis.readInt();
+ for ( int i = 0; i < appStateSize; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ break;
+ }
+
+ String key = dis.readUTF();
+ ApplicationState appState = ApplicationState.serializer().deserialize(dis);
+ epState.addApplicationState(key, appState);
+ }
+ return epState;
+ }
+}