You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2012/09/14 17:16:18 UTC
[4/5] git commit: p/4443/020_calculate_pending
p/4443/020_calculate_pending
Wire a collection of relocating tokens into the calculation
of endpoints for pending ranges.
Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4559
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f643a4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f643a4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f643a4a
Branch: refs/heads/trunk
Commit: 9f643a4ae62200d47fbe64f2b44260b767449767
Parents: 1f36519
Author: Eric Evans <ee...@apache.org>
Authored: Fri Sep 14 10:08:53 2012 -0500
Committer: Eric Evans <ee...@apache.org>
Committed: Fri Sep 14 10:11:00 2012 -0500
----------------------------------------------------------------------
.../apache/cassandra/locator/TokenMetadata.java | 91 ++++++++++++++-
.../apache/cassandra/service/StorageService.java | 16 +++-
2 files changed, 105 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f643a4a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index c666c47..0ecb125 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -84,6 +84,8 @@ public class TokenMetadata
// nodes which are migrating to the new tokens in the ring
private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>();
+ // tokens which are migrating to new endpoints
+ private final Map<Token, InetAddress> relocatingTokens = new HashMap<Token, InetAddress>();
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -347,6 +349,33 @@ public class TokenMetadata
}
}
+ /**
+ * Add new relocating ranges (tokens moving from their respective endpoints, to another).
+ * @param tokens tokens being moved
+ * @param endpoint destination of moves
+ */
+ public void addRelocatingTokens(Collection<Token> tokens, InetAddress endpoint)
+ {
+ assert endpoint != null;
+ assert tokens != null && tokens.size() > 0;
+
+ lock.writeLock().lock();
+
+ try
+ {
+ for (Token token : tokens)
+ {
+ InetAddress prev = relocatingTokens.put(token, endpoint);
+ if (prev != null && !prev.equals(endpoint))
+ logger.warn("Relocation of {} to {} overwrites previous to {}", new Object[]{token, endpoint, prev});
+ }
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public void removeEndpoint(InetAddress endpoint)
{
assert endpoint != null;
@@ -396,6 +425,38 @@ public class TokenMetadata
}
}
+ /**
+ * Remove pair of token/address from relocating ranges.
+ * @param endpoint
+ */
+ public void removeFromRelocating(Token token, InetAddress endpoint)
+ {
+ assert endpoint != null;
+ assert token != null;
+
+ lock.writeLock().lock();
+
+ try
+ {
+ InetAddress previous = relocatingTokens.remove(token);
+
+ if (previous == null)
+ {
+ logger.debug("Cannot remove {}, not found among the relocating (previously removed?)", token);
+ }
+ else if (!previous.equals(endpoint))
+ {
+ logger.warn(
+ "Removal of relocating token {} with mismatched endpoint ({} != {})",
+ new Object[]{token, endpoint, previous});
+ }
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public Collection<Token> getTokens(InetAddress endpoint)
{
assert endpoint != null;
@@ -470,6 +531,22 @@ public class TokenMetadata
}
}
+ public boolean isRelocating(Token token)
+ {
+ assert token != null;
+
+ lock.readLock().lock();
+
+ try
+ {
+ return relocatingTokens.containsKey(token);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
/**
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
* bootstrap tokens and leaving endpoints are not included in the copy.
@@ -513,7 +590,7 @@ public class TokenMetadata
/**
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
- * current leave and move operations have finished.
+ * current leave, move, and relocate operations have finished.
*
* @return new token metadata
*/
@@ -532,6 +609,9 @@ public class TokenMetadata
for (Pair<Token, InetAddress> pair : movingEndpoints)
metadata.updateNormalToken(pair.left, pair.right);
+ for (Map.Entry<Token, InetAddress> relocating: relocatingTokens.entrySet())
+ metadata.updateNormalToken(relocating.getKey(), relocating.getValue());
+
return metadata;
}
finally
@@ -654,6 +734,15 @@ public class TokenMetadata
return movingEndpoints;
}
+ /**
+ * Ranges which are migrating to new endpoints.
+ * @return set of token-address pairs of relocating ranges
+ */
+ public Map<Token, InetAddress> getRelocatingRanges()
+ {
+ return relocatingTokens;
+ }
+
public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
{
assert ring.size() > 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f643a4a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 88780dd..07ee2bd 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1525,7 +1525,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
// At this stage pendingRanges has been updated according to leaving and bootstrapping nodes.
- // We can now finish the calculation by checking moving nodes.
+ // We can now finish the calculation by checking moving and relocating nodes.
// For each of the moving nodes, we do the same thing we did for bootstrapping:
// simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
@@ -1544,6 +1544,20 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
allLeftMetadata.removeEndpoint(endpoint);
}
+ // Ranges being relocated.
+ for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet())
+ {
+ InetAddress endpoint = relocating.getValue(); // address of the moving node
+ Token token = relocating.getKey();
+
+ allLeftMetadata.updateNormalToken(token, endpoint);
+
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ pendingRanges.put(range, endpoint);
+
+ allLeftMetadata.removeEndpoint(endpoint);
+ }
+
tm.setPendingRanges(table, pendingRanges);
if (logger.isDebugEnabled())