You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/16 19:13:35 UTC
svn commit: r891356 - in /incubator/cassandra/branches/cassandra-0.5: ./
src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/
test/unit/org/apache/cassandra/dht/ test/unit/org/apache/cassand...
Author: jbellis
Date: Wed Dec 16 18:13:34 2009
New Revision: 891356
URL: http://svn.apache.org/viewvc?rev=891356&view=rev
Log:
Fix pending range conflicts when bootstapping or moving multiple nodes at once. patch by Jaakko Laine; reviewed by jbellis for CASSANDRA-603
Modified:
incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Modified: incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/CHANGES.txt?rev=891356&r1=891355&r2=891356&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.5/CHANGES.txt Wed Dec 16 18:13:34 2009
@@ -4,6 +4,8 @@
* fix data streaming on windows (CASSANDRA-630)
* GC compacted sstables after cleanup and compaction (CASSANDRA-621)
* Speed up anti-entropy validation (CASSANDRA-629)
+ * Fix pending range conflicts when bootstapping or moving
+ multiple nodes at once (CASSANDRA-603)
0.5.0 beta 2
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=891356&r1=891355&r2=891356&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Dec 16 18:13:34 2009
@@ -91,11 +91,11 @@
List<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints);
- for (Map.Entry<Range, InetAddress> entry : tokenMetadata_.getPendingRanges().entrySet())
+ for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges().entrySet())
{
if (entry.getKey().contains(token))
{
- endpoints.add(entry.getValue());
+ endpoints.addAll(entry.getValue());
}
}
@@ -202,26 +202,9 @@
public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress)
{
- TokenMetadata temp = metadata.cloneWithoutPending();
- temp.update(pendingToken, pendingAddress);
+ TokenMetadata temp = metadata.cloneOnlyTokenMap();
+ temp.updateNormalToken(pendingToken, pendingAddress);
return getAddressRanges(temp).get(pendingAddress);
}
- public void removeObsoletePendingRanges()
- {
- Multimap<InetAddress, Range> ranges = getAddressRanges();
- for (Map.Entry<Range, InetAddress> entry : tokenMetadata_.getPendingRanges().entrySet())
- {
- for (Range currentRange : ranges.get(entry.getValue()))
- {
- if (currentRange.contains(entry.getKey()))
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Removing obsolete pending range " + entry.getKey() + " from " + entry.getValue());
- tokenMetadata_.removePendingRange(entry.getKey());
- break;
- }
- }
- }
- }
}
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=891356&r1=891355&r2=891356&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed Dec 16 18:13:34 2009
@@ -29,15 +29,33 @@
import org.apache.commons.lang.StringUtils;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.HashMultimap;
public class TokenMetadata
{
/* Maintains token to endpoint map of every node in the cluster. */
private BiMap<Token, InetAddress> tokenToEndPointMap;
- private Map<Range, InetAddress> pendingRanges;
+
+ // Suppose that there is a ring of nodes A, C and E, with replication factor 3.
+ // Node D bootstraps between C and E, so its pending ranges will be E-A, A-C and C-D.
+ // Now suppose node B bootstraps between A and C at the same time. Its pending ranges would be C-E, E-A and A-B.
+ // Now both nodes have pending range E-A in their list, which will cause pending range collision
+ // even though we're only talking about replica range, not even primary range. The same thing happens
+ // for any nodes that boot simultaneously between same two nodes. For this we cannot simply make pending ranges a multimap,
+ // since that would make us unable to notice the real problem of two nodes trying to boot using the same token.
+ // In order to do this properly, we need to know what tokens are booting at any time.
+ private Map<Token, InetAddress> bootstrapTokens;
+
+ // we will need to know at all times what nodes are leaving and calculate ranges accordingly.
+ // An anonymous pending ranges list is not enough, as that does not tell which node is leaving
+ // and/or if the ranges are there because of bootstrap or leave operation.
+ // (See CASSANDRA-603 for more detail + examples).
+ private Set<InetAddress> leavingEndPoints;
+
+ private Multimap<Range, InetAddress> pendingRanges;
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -53,7 +71,9 @@
if (tokenToEndPointMap == null)
tokenToEndPointMap = HashBiMap.create();
this.tokenToEndPointMap = tokenToEndPointMap;
- pendingRanges = new NonBlockingHashMap<Range, InetAddress>();
+ bootstrapTokens = new HashMap<Token, InetAddress>();
+ leavingEndPoints = new HashSet<InetAddress>();
+ pendingRanges = HashMultimap.create();
sortedTokens = sortTokens();
}
@@ -69,18 +89,13 @@
{
int n = 0;
Range sourceRange = getPrimaryRangeFor(getToken(source));
- for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
- {
- if (sourceRange.contains(entry.getKey()) || entry.getValue().equals(source))
+ for (Token token : bootstrapTokens.keySet())
+ if (sourceRange.contains(token))
n++;
- }
return n;
}
- /**
- * Update the two maps in an safe mode.
- */
- public void update(Token token, InetAddress endpoint)
+ public void updateNormalToken(Token token, InetAddress endpoint)
{
assert token != null;
assert endpoint != null;
@@ -88,6 +103,8 @@
lock.writeLock().lock();
try
{
+ bootstrapTokens.remove(token);
+
tokenToEndPointMap.inverse().remove(endpoint);
if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
{
@@ -100,13 +117,49 @@
}
}
+ public void addBootstrapToken(Token token, InetAddress endpoint)
+ {
+ assert token != null;
+ assert endpoint != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ InetAddress oldEndPoint = bootstrapTokens.get(token);
+ if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
+ throw new RuntimeException("Bootstrap Token collision between " + oldEndPoint + " and " + endpoint + " (token " + token);
+ bootstrapTokens.put(token, endpoint);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void addLeavingEndPoint(InetAddress endpoint)
+ {
+ assert endpoint != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ leavingEndPoints.add(endpoint);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public void removeEndpoint(InetAddress endpoint)
{
assert tokenToEndPointMap.containsValue(endpoint);
lock.writeLock().lock();
try
{
+ bootstrapTokens.remove(getToken(endpoint));
tokenToEndPointMap.inverse().remove(endpoint);
+ leavingEndPoints.remove(endpoint);
sortedTokens = sortTokens();
}
finally
@@ -161,7 +214,11 @@
}
}
- public TokenMetadata cloneWithoutPending()
+ /**
+ * Create a copy of TokenMetadata with only tokenToEndPointMap. That is, pending ranges,
+ * bootstrap tokens and leaving endpoints are not included in the copy.
+ */
+ public TokenMetadata cloneOnlyTokenMap()
{
lock.readLock().lock();
try
@@ -174,28 +231,24 @@
}
}
- public String toString()
+ /**
+ * Create a copy of TokenMetadata with tokenToEndPointMap reflecting situation after all
+ * current leave operations have finished.
+ */
+ public TokenMetadata cloneAfterAllLeft()
{
- StringBuilder sb = new StringBuilder();
lock.readLock().lock();
try
{
- Set<InetAddress> eps = tokenToEndPointMap.inverse().keySet();
-
- for (InetAddress ep : eps)
- {
- sb.append(ep);
- sb.append(":");
- sb.append(tokenToEndPointMap.inverse().get(ep));
- sb.append(System.getProperty("line.separator"));
- }
+ TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
+ for (InetAddress endPoint : leavingEndPoints)
+ allLeftMetadata.removeEndpoint(endPoint);
+ return allLeftMetadata;
}
finally
{
lock.readLock().unlock();
}
-
- return sb.toString();
}
public InetAddress getEndPoint(Token token)
@@ -211,12 +264,6 @@
}
}
- public void clearUnsafe()
- {
- tokenToEndPointMap.clear();
- pendingRanges.clear();
- }
-
public Range getPrimaryRangeFor(Token right)
{
return new Range(getPredecessor(right), right);
@@ -235,29 +282,16 @@
}
}
- public void addPendingRange(Range range, InetAddress endpoint)
- {
- InetAddress oldEndpoint = pendingRanges.get(range);
- if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
- throw new RuntimeException("pending range collision between " + oldEndpoint + " and " + endpoint);
- pendingRanges.put(range, endpoint);
- }
-
- public void removePendingRange(Range range)
- {
- pendingRanges.remove(range);
- }
-
/** a mutable map may be returned but caller should not modify it */
- public Map<Range, InetAddress> getPendingRanges()
+ public Map<Range, Collection<InetAddress>> getPendingRanges()
{
- return pendingRanges;
+ return pendingRanges.asMap();
}
public List<Range> getPendingRanges(InetAddress endpoint)
{
List<Range> ranges = new ArrayList<Range>();
- for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
+ for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
{
if (entry.getValue().equals(endpoint))
{
@@ -267,6 +301,11 @@
return ranges;
}
+ public void setPendingRanges(Multimap<Range, InetAddress> pendingRanges)
+ {
+ this.pendingRanges = pendingRanges;
+ }
+
public Token getPredecessor(Token token)
{
List tokens = sortedTokens();
@@ -288,8 +327,96 @@
return getEndPoint(getSuccessor(getToken(endPoint)));
}
- public void clearPendingRanges()
+ /** caller should not modify bootstrapTokens */
+ public Map<Token, InetAddress> getBootstrapTokens()
+ {
+ return bootstrapTokens;
+ }
+
+ /** caller should not modify leavigEndPoints */
+ public Set<InetAddress> getLeavingEndPoints()
{
+ return leavingEndPoints;
+ }
+
+ /** used by tests */
+ public void clearUnsafe()
+ {
+ bootstrapTokens.clear();
+ tokenToEndPointMap.clear();
+ leavingEndPoints.clear();
pendingRanges.clear();
}
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ lock.readLock().lock();
+ try
+ {
+ Set<InetAddress> eps = tokenToEndPointMap.inverse().keySet();
+
+ if (!eps.isEmpty())
+ {
+ sb.append("Normal Tokens:");
+ sb.append(System.getProperty("line.separator"));
+ for (InetAddress ep : eps)
+ {
+ sb.append(ep);
+ sb.append(":");
+ sb.append(tokenToEndPointMap.inverse().get(ep));
+ sb.append(System.getProperty("line.separator"));
+ }
+ }
+
+ if (!bootstrapTokens.isEmpty())
+ {
+ sb.append("Bootstrapping Tokens:" );
+ sb.append(System.getProperty("line.separator"));
+ for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+ {
+ sb.append(entry.getValue() + ":" + entry.getKey());
+ sb.append(System.getProperty("line.separator"));
+ }
+ }
+
+ if (!leavingEndPoints.isEmpty())
+ {
+ sb.append("Leaving EndPoints:");
+ sb.append(System.getProperty("line.separator"));
+ for (InetAddress ep : leavingEndPoints)
+ {
+ sb.append(ep);
+ sb.append(System.getProperty("line.separator"));
+ }
+ }
+
+ if (!pendingRanges.isEmpty())
+ {
+ sb.append("Pending Ranges:");
+ sb.append(System.getProperty("line.separator"));
+ sb.append(printPendingRanges());
+ }
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+
+ return sb.toString();
+ }
+
+ public String printPendingRanges()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
+ {
+ sb.append(entry.getValue() + ":" + entry.getKey());
+ sb.append(System.getProperty("line.separator"));
+ }
+
+ return sb.toString();
+ }
+
}
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java?rev=891356&r1=891355&r2=891356&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java Wed Dec 16 18:13:34 2009
@@ -193,7 +193,7 @@
if (logger_.isDebugEnabled())
logger_.debug("Setting token to " + token);
SystemTable.updateToken(token);
- tokenMetadata_.update(token, FBUtilities.getLocalAddress());
+ tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
}
public StorageService()
@@ -316,7 +316,7 @@
{
SystemTable.setBootstrapped(true);
Token token = storageMetadata_.getToken();
- tokenMetadata_.update(token, FBUtilities.getLocalAddress());
+ tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
}
@@ -417,23 +417,25 @@
Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
if (logger_.isDebugEnabled())
logger_.debug(endpoint + " state bootstrapping, token " + token);
- updateBootstrapRanges(token, endpoint);
+ tokenMetadata_.addBootstrapToken(token, endpoint);
+ calculatePendingRanges();
}
else if (STATE_NORMAL.equals(stateName))
{
Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
if (logger_.isDebugEnabled())
logger_.debug(endpoint + " state normal, token " + token);
- tokenMetadata_.update(token, endpoint);
+ tokenMetadata_.updateNormalToken(token, endpoint);
+ calculatePendingRanges();
if (!isClientMode)
SystemTable.updateToken(endpoint, token);
- replicationStrategy_.removeObsoletePendingRanges();
}
else if (STATE_LEAVING.equals(stateName))
{
Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
assert tokenMetadata_.getToken(endpoint).equals(token);
- updateLeavingRanges(endpoint);
+ tokenMetadata_.addLeavingEndPoint(endpoint);
+ calculatePendingRanges();
}
else if (STATE_LEFT.equals(stateName))
{
@@ -452,6 +454,7 @@
logger_.debug(endpoint + " state left, token " + token);
assert tokenMetadata_.getToken(endpoint).equals(token);
tokenMetadata_.removeEndpoint(endpoint);
+ calculatePendingRanges();
}
}
else
@@ -464,11 +467,94 @@
{
restoreReplicaCount(endPointThatLeft);
tokenMetadata_.removeEndpoint(endPointThatLeft);
+ calculatePendingRanges();
}
}
+ }
+ }
+
+ /**
+ * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
+ *
+ * (1) When in doubt, it is better to write too much to a node than too little. That is, if
+ * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning
+ * up unneeded data afterwards is better than missing writes during movement.
+ * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional
+ * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore
+ * we will first remove _all_ leaving tokens for the sake of calculation and then check what
+ * ranges would go where if all nodes are to leave. This way we get the biggest possible
+ * ranges with regard current leave operations, covering all subsets of possible final range
+ * values.
+ * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing
+ * complex calculations to see if multiple bootstraps overlap, we simply base calculations
+ * on the same token ring used before (reflecting situation after all leave operations have
+ * completed). Bootstrapping nodes will be added and removed one by one to that metadata and
+ * checked what their ranges would be. This will give us the biggest possible ranges the
+ * node could have. It might be that other bootstraps make our actual final ranges smaller,
+ * but it does not matter as we can clean up the data afterwards.
+ *
+ * NOTE: This is heavy and ineffective operation. This will be done only once when a node
+ * changes state in the cluster, so it should be manageable.
+ */
+ private void calculatePendingRanges()
+ {
+ calculatePendingRanges(tokenMetadata_, replicationStrategy_);
+ }
+
+ // public & static for testing purposes
+ public static void calculatePendingRanges(TokenMetadata tm, AbstractReplicationStrategy strategy)
+ {
+ Multimap<Range, InetAddress> pendingRanges = HashMultimap.create();
+ Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
+ Set<InetAddress> leavingEndPoints = tm.getLeavingEndPoints();
+
+ if (bootstrapTokens.isEmpty() && leavingEndPoints.isEmpty())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("No bootstrapping or leaving nodes -> empty pending ranges");
+ tm.setPendingRanges(pendingRanges);
+ return;
+ }
+
+ Multimap<InetAddress, Range> addressRanges = strategy.getAddressRanges();
+
+ // Copy of metadata reflecting the situation after all leave operations are finished.
+ TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
+
+ // get all ranges that will be affected by leaving nodes
+ Set<Range> affectedRanges = new HashSet<Range>();
+ for (InetAddress endPoint : leavingEndPoints)
+ affectedRanges.addAll(addressRanges.get(endPoint));
+
+ // for each of those ranges, find what new nodes will be responsible for the range when
+ // all leaving nodes are gone.
+ for (Range range : affectedRanges)
+ {
+ List<InetAddress> currentEndPoints = strategy.getNaturalEndpoints(range.right(), tm);
+ List<InetAddress> newEndPoints = strategy.getNaturalEndpoints(range.right(), allLeftMetadata);
+ newEndPoints.removeAll(currentEndPoints);
+ pendingRanges.putAll(range, newEndPoints);
+ }
+
+ // At this stage pendingRanges has been updated according to leave operations. We can
+ // now finish the calculation by checking bootstrapping nodes.
+
+ // For each of the bootstrapping nodes, simply add and remove them one by one to
+ // allLeftMetadata and check in between what their ranges would be.
+ for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+ {
+ InetAddress endPoint = entry.getValue();
- replicationStrategy_.removeObsoletePendingRanges();
+ allLeftMetadata.updateNormalToken(entry.getKey(), endPoint);
+ for (Range range : strategy.getAddressRanges(allLeftMetadata).get(endPoint))
+ pendingRanges.put(range, endPoint);
+ allLeftMetadata.removeEndpoint(endPoint);
}
+
+ tm.setPendingRanges(pendingRanges);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Pending ranges:\n" + tm.printPendingRanges());
}
/**
@@ -544,7 +630,7 @@
Collection<Range> ranges = getRangesForEndPoint(endpoint);
if (logger_.isDebugEnabled())
- logger_.debug("leaving node ranges are [" + StringUtils.join(ranges, ", ") + "]");
+ logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, ", ") + "]");
Map<Range, ArrayList<InetAddress>> currentReplicaEndpoints = new HashMap<Range, ArrayList<InetAddress>>();
@@ -552,7 +638,7 @@
for (Range range : ranges)
currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right(), tokenMetadata_));
- TokenMetadata temp = tokenMetadata_.cloneWithoutPending();
+ TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
temp.removeEndpoint(endpoint);
Multimap<Range, InetAddress> changedRanges = HashMultimap.create();
@@ -567,43 +653,13 @@
ArrayList<InetAddress> newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right(), temp);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger_.isDebugEnabled())
- logger_.debug("adding pending range " + range + " to endpoints " + StringUtils.join(newReplicaEndpoints, ", "));
+ logger_.debug("Range " + range + " will be responsibility of " + StringUtils.join(newReplicaEndpoints, ", "));
changedRanges.putAll(range, newReplicaEndpoints);
}
return changedRanges;
}
- private void updateLeavingRanges(final InetAddress endpoint)
- {
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " is leaving; calculating pendingranges");
- Multimap<Range, InetAddress> ranges = getChangedRangesForLeaving(endpoint);
- for (Range range : ranges.keySet())
- {
- for (InetAddress newEndpoint : ranges.get(range))
- {
- tokenMetadata_.addPendingRange(range, newEndpoint);
- }
- }
- }
-
- private void updateBootstrapRanges(Token token, InetAddress endpoint)
- {
- for (Range range : replicationStrategy_.getPendingAddressRanges(tokenMetadata_, token, endpoint))
- {
- tokenMetadata_.addPendingRange(range, endpoint);
- }
- }
-
- public static void updateBootstrapRanges(AbstractReplicationStrategy strategy, TokenMetadata metadata, Token token, InetAddress endpoint)
- {
- for (Range range : strategy.getPendingAddressRanges(metadata, token, endpoint))
- {
- metadata.addPendingRange(range, endpoint);
- }
- }
-
public void onJoin(InetAddress endpoint, EndPointState epState)
{
for (Map.Entry<String,ApplicationState> entry : epState.getSortedApplicationStates())
@@ -1127,7 +1183,6 @@
{
SystemTable.setBootstrapped(false);
tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
- replicationStrategy_.removeObsoletePendingRanges();
if (logger_.isDebugEnabled())
logger_.debug("");
@@ -1248,7 +1303,6 @@
restoreReplicaCount(endPoint);
tokenMetadata_.removeEndpoint(endPoint);
- replicationStrategy_.removeObsoletePendingRanges();
}
// This is not the cleanest way as we're adding STATE_LEFT for
@@ -1271,11 +1325,6 @@
return replicationStrategy_;
}
- public void cancelPendingRanges()
- {
- tokenMetadata_.clearPendingRanges();
- }
-
public boolean isClientMode()
{
return isClientMode;
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=891356&r1=891355&r2=891356&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Dec 16 18:13:34 2009
@@ -141,13 +141,6 @@
public void loadBalance() throws IOException, InterruptedException;
/**
- * cancel writes to nodes that are set to be changing ranges.
- * Only do this if the reason for the range changes no longer exists
- * (e.g., a bootstrapping node was killed or crashed.)
- */
- public void cancelPendingRanges();
-
- /**
* removeToken removes token (and all data associated with
* enpoint that had it) from the ring
*/
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=891356&r1=891355&r2=891356&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Dec 16 18:13:34 2009
@@ -398,11 +398,6 @@
ssProxy.move(newToken);
}
- public void cancelPendingRanges()
- {
- ssProxy.cancelPendingRanges();
- }
-
public void removeToken(String token)
{
ssProxy.removeToken(token);
@@ -503,7 +498,7 @@
HelpFormatter hf = new HelpFormatter();
String header = String.format(
"%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, " +
- "tpstats, flush, repair, decommission, move, loadbalance, cancelpending, removetoken, " +
+ "tpstats, flush, repair, decommission, move, loadbalance, removetoken, " +
" getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
@@ -578,10 +573,6 @@
}
probe.move(arguments[1]);
}
- else if (cmdName.equals("cancelpending"))
- {
- probe.cancelPendingRanges();
- }
else if (cmdName.equals("removetoken"))
{
if (arguments.length <= 1)
Modified: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=891356&r1=891355&r2=891356&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Wed Dec 16 18:13:34 2009
@@ -32,6 +32,7 @@
import com.google.common.collect.Multimap;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -61,7 +62,8 @@
Range range3 = ss.getPrimaryRangeForEndPoint(three);
Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(), range3.right());
assert range3.contains(fakeToken);
- StorageService.updateBootstrapRanges(StorageService.instance().getReplicationStrategy(), tmd, fakeToken, myEndpoint);
+ ss.onChange(myEndpoint, StorageService.STATE_BOOTSTRAPPING, new ApplicationState(ss.getPartitioner().getTokenFactory().toString(fakeToken)));
+ tmd = ss.getTokenMetadata();
InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);
assert two.equals(source2) : source2;
@@ -124,7 +126,7 @@
for (int i = 1; i <= numOldNodes; i++)
{
// leave .1 for myEndpoint
- tmd.update(p.getRandomToken(), InetAddress.getByName("127.0.0." + (i + 1)));
+ tmd.updateNormalToken(p.getRandomToken(), InetAddress.getByName("127.0.0." + (i + 1)));
}
}
}
\ No newline at end of file
Modified: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=891356&r1=891355&r2=891356&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Wed Dec 16 18:13:34 2009
@@ -79,7 +79,7 @@
for (int i = 0; i < endPointTokens.length; i++)
{
InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
- tmd.update(endPointTokens[i], ep);
+ tmd.updateNormalToken(endPointTokens[i], ep);
hosts.add(ep);
}
@@ -114,15 +114,16 @@
for (int i = 0; i < endPointTokens.length; i++)
{
InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
- tmd.update(endPointTokens[i], ep);
+ tmd.updateNormalToken(endPointTokens[i], ep);
hosts.add(ep);
}
//Add bootstrap node id=6
Token bsToken = new BigIntegerToken(String.valueOf(25));
InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
- StorageService.updateBootstrapRanges(strategy, tmd, bsToken, bootstrapEndPoint);
-
+ tmd.addBootstrapToken(bsToken, bootstrapEndPoint);
+ StorageService.calculatePendingRanges(tmd, strategy);
+
for (int i = 0; i < keyTokens.length; i++)
{
Collection<InetAddress> endPoints = strategy.getWriteEndpoints(keyTokens[i], strategy.getNaturalEndpoints(keyTokens[i]));
@@ -136,6 +137,8 @@
// for 5, 15, 25 this should include bootstrap node
if (i < 3)
assertTrue(endPoints.contains(bootstrapEndPoint));
+ else
+ assertFalse(endPoints.contains(bootstrapEndPoint));
}
}
}