You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2019/06/03 15:05:07 UTC
[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit cc1bc2a8e1bcac8cee08289ca04d6490a95f225e
Merge: 797ec05 8db0d84
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Mon Jun 3 16:02:36 2019 +0100
Merge branch 'cassandra-3.11' into trunk
.../apache/cassandra/service/StorageService.java | 156 ++++++++++++---------
.../org/apache/cassandra/service/MoveTest.java | 11 +-
2 files changed, 96 insertions(+), 71 deletions(-)
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index eade7dd,c340db6..2b6bf1d
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2457,18 -2311,95 +2457,94 @@@ public class StorageService extends Not
tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
}
- private void ensureUpToDateTokenMetadata(String status, InetAddress endpoint)
++ private void ensureUpToDateTokenMetadata(String status, InetAddressAndPort endpoint)
+ {
+ Set<Token> tokens = new TreeSet<>(getTokensFor(endpoint));
+
+ if (logger.isDebugEnabled())
+ logger.debug("Node {} state {}, tokens {}", endpoint, status, tokens);
+
+ // If the node is previously unknown or tokens do not match, update tokenmetadata to
+ // have this node as 'normal' (it must have been using this token before the
+ // leave). This way we'll get pending ranges right.
+ if (!tokenMetadata.isMember(endpoint))
+ {
+ logger.info("Node {} state jump to {}", endpoint, status);
+ updateTokenMetadata(endpoint, tokens);
+ }
+ else if (!tokens.equals(new TreeSet<>(tokenMetadata.getTokens(endpoint))))
+ {
+ logger.warn("Node {} '{}' token mismatch. Long network partition?", endpoint, status);
+ updateTokenMetadata(endpoint, tokens);
+ }
+ }
+
- private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens)
++ private void updateTokenMetadata(InetAddressAndPort endpoint, Iterable<Token> tokens)
+ {
+ updateTokenMetadata(endpoint, tokens, new HashSet<>());
+ }
+
- private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens, Set<InetAddress> endpointsToRemove)
++ private void updateTokenMetadata(InetAddressAndPort endpoint, Iterable<Token> tokens, Set<InetAddressAndPort> endpointsToRemove)
+ {
+ Set<Token> tokensToUpdateInMetadata = new HashSet<>();
+ Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
+
+ for (final Token token : tokens)
+ {
+ // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
- InetAddress currentOwner = tokenMetadata.getEndpoint(token);
++ InetAddressAndPort currentOwner = tokenMetadata.getEndpoint(token);
+ if (currentOwner == null)
+ {
+ logger.debug("New node {} at token {}", endpoint, token);
+ tokensToUpdateInMetadata.add(token);
+ tokensToUpdateInSystemKeyspace.add(token);
+ }
+ else if (endpoint.equals(currentOwner))
+ {
+ // set state back to normal, since the node may have tried to leave, but failed and is now back up
+ tokensToUpdateInMetadata.add(token);
+ tokensToUpdateInSystemKeyspace.add(token);
+ }
+ else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
+ {
+ tokensToUpdateInMetadata.add(token);
+ tokensToUpdateInSystemKeyspace.add(token);
+
+ // currentOwner is no longer current, endpoint is. Keep track of these moves, because when
+ // a host no longer has any tokens, we'll want to remove it.
- Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
++ Multimap<InetAddressAndPort, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+ epToTokenCopy.get(currentOwner).remove(token);
+ if (epToTokenCopy.get(currentOwner).isEmpty())
+ endpointsToRemove.add(currentOwner);
+
+ logger.info("Nodes {} and {} have the same token {}. {} is the new owner", endpoint, currentOwner, token, endpoint);
+ }
+ else
+ {
+ logger.info("Nodes () and {} have the same token {}. Ignoring {}", endpoint, currentOwner, token, endpoint);
+ }
+ }
+
+ tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
- for (InetAddress ep : endpointsToRemove)
++ for (InetAddressAndPort ep : endpointsToRemove)
+ {
+ removeEndpoint(ep);
+ if (replacing && ep.equals(DatabaseDescriptor.getReplaceAddress()))
+ Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
+ }
+ if (!tokensToUpdateInSystemKeyspace.isEmpty())
- SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
++ SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
+ }
-
/**
* Handle node move to normal state. That is, node is entering token ring and participating
* in reads.
*
* @param endpoint node
*/
- private void handleStateNormal(final InetAddress endpoint, final String status)
+ private void handleStateNormal(final InetAddressAndPort endpoint, final String status)
{
Collection<Token> tokens = getTokensFor(endpoint);
- Set<Token> tokensToUpdateInMetadata = new HashSet<>();
- Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
- Set<InetAddress> endpointsToRemove = new HashSet<>();
+ Set<InetAddressAndPort> endpointsToRemove = new HashSet<>();
if (logger.isDebugEnabled())
logger.debug("Node {} state {}, token {}", endpoint, status, tokens);
@@@ -2610,13 -2490,8 +2635,8 @@@
*
* @param endpoint node
*/
- private void handleStateLeaving(InetAddress endpoint)
+ private void handleStateLeaving(InetAddressAndPort endpoint)
{
- Collection<Token> tokens = getTokensFor(endpoint);
-
- if (logger.isDebugEnabled())
- logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
-
// If the node is previously unknown or tokens do not match, update tokenmetadata to
// have this node as 'normal' (it must have been using this token before the
// leave). This way we'll get pending ranges right.
@@@ -2660,8 -2527,10 +2672,10 @@@
* @param endpoint moving endpoint address
* @param pieces STATE_MOVING, token
*/
- private void handleStateMoving(InetAddress endpoint, String[] pieces)
+ private void handleStateMoving(InetAddressAndPort endpoint, String[] pieces)
{
+ ensureUpToDateTokenMetadata(VersionedValue.STATUS_MOVING, endpoint);
+
assert pieces.length >= 2;
Token token = getTokenFactory().fromString(pieces[1]);
diff --cc test/unit/org/apache/cassandra/service/MoveTest.java
index a7cfc1b,39ebd66..a4da7b8
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@@ -496,14 -480,22 +496,23 @@@ public class MoveTes
{
tmd.removeFromMoving(host);
assertTrue(!tmd.isMoving(host));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host);
+ Token newToken = new BigIntegerToken(String.valueOf(token));
+ tmd.updateNormalToken(newToken, host);
+ // As well as upating TMD, update the host's tokens in gossip. Since CASSANDRA-15120, status changing to MOVING
+ // ensures that TMD is up to date with token assignments according to gossip. So we need to make sure gossip has
+ // the correct new token, as the moving node itself would do upon successful completion of the move operation.
+ // Without this, the next movement for that host will set the token in TMD's back to the old value from gossip
+ // and incorrect range movements will follow
+ Gossiper.instance.injectApplicationState(host,
+ ApplicationState.TOKENS,
+ new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(newToken)));
}
- private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
+ private Map.Entry<Range<Token>, EndpointsForRange> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
{
- Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>();
- pendingRanges.put(generateRange(start, end), makeAddrs(endpoints));
+ Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>();
+ Range<Token> range = generateRange(start, end);
+ pendingRanges.put(range, makeReplicas(range, endpoints));
return pendingRanges.entrySet().iterator().next();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org