You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2019/06/03 15:05:07 UTC

[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit cc1bc2a8e1bcac8cee08289ca04d6490a95f225e
Merge: 797ec05 8db0d84
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Mon Jun 3 16:02:36 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 .../apache/cassandra/service/StorageService.java   | 156 ++++++++++++---------
 .../org/apache/cassandra/service/MoveTest.java     |  11 +-
 2 files changed, 96 insertions(+), 71 deletions(-)

diff --cc src/java/org/apache/cassandra/service/StorageService.java
index eade7dd,c340db6..2b6bf1d
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2457,18 -2311,95 +2457,94 @@@ public class StorageService extends Not
          tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
      }
  
 -    private void ensureUpToDateTokenMetadata(String status, InetAddress endpoint)
++    private void ensureUpToDateTokenMetadata(String status, InetAddressAndPort endpoint)
+     {
+         Set<Token> tokens = new TreeSet<>(getTokensFor(endpoint));
+ 
+         if (logger.isDebugEnabled())
+             logger.debug("Node {} state {}, tokens {}", endpoint, status, tokens);
+ 
+         // If the node is previously unknown or tokens do not match, update tokenmetadata to
+         // have this node as 'normal' (it must have been using this token before the
+         // leave). This way we'll get pending ranges right.
+         if (!tokenMetadata.isMember(endpoint))
+         {
+             logger.info("Node {} state jump to {}", endpoint, status);
+             updateTokenMetadata(endpoint, tokens);
+         }
+         else if (!tokens.equals(new TreeSet<>(tokenMetadata.getTokens(endpoint))))
+         {
+             logger.warn("Node {} '{}' token mismatch. Long network partition?", endpoint, status);
+             updateTokenMetadata(endpoint, tokens);
+         }
+     }
+ 
 -    private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens)
++    private void updateTokenMetadata(InetAddressAndPort endpoint, Iterable<Token> tokens)
+     {
+         updateTokenMetadata(endpoint, tokens, new HashSet<>());
+     }
+ 
 -    private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens, Set<InetAddress> endpointsToRemove)
++    private void updateTokenMetadata(InetAddressAndPort endpoint, Iterable<Token> tokens, Set<InetAddressAndPort> endpointsToRemove)
+     {
+         Set<Token> tokensToUpdateInMetadata = new HashSet<>();
+         Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
+ 
+         for (final Token token : tokens)
+         {
+             // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
 -            InetAddress currentOwner = tokenMetadata.getEndpoint(token);
++            InetAddressAndPort currentOwner = tokenMetadata.getEndpoint(token);
+             if (currentOwner == null)
+             {
+                 logger.debug("New node {} at token {}", endpoint, token);
+                 tokensToUpdateInMetadata.add(token);
+                 tokensToUpdateInSystemKeyspace.add(token);
+             }
+             else if (endpoint.equals(currentOwner))
+             {
+                 // set state back to normal, since the node may have tried to leave, but failed and is now back up
+                 tokensToUpdateInMetadata.add(token);
+                 tokensToUpdateInSystemKeyspace.add(token);
+             }
+             else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
+             {
+                 tokensToUpdateInMetadata.add(token);
+                 tokensToUpdateInSystemKeyspace.add(token);
+ 
+                 // currentOwner is no longer current, endpoint is.  Keep track of these moves, because when
+                 // a host no longer has any tokens, we'll want to remove it.
 -                Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
++                Multimap<InetAddressAndPort, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+                 epToTokenCopy.get(currentOwner).remove(token);
+                 if (epToTokenCopy.get(currentOwner).isEmpty())
+                     endpointsToRemove.add(currentOwner);
+ 
+                 logger.info("Nodes {} and {} have the same token {}. {} is the new owner", endpoint, currentOwner, token, endpoint);
+             }
+             else
+             {
+                 logger.info("Nodes () and {} have the same token {}.  Ignoring {}", endpoint, currentOwner, token, endpoint);
+             }
+         }
+ 
+         tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
 -        for (InetAddress ep : endpointsToRemove)
++        for (InetAddressAndPort ep : endpointsToRemove)
+         {
+             removeEndpoint(ep);
+             if (replacing && ep.equals(DatabaseDescriptor.getReplaceAddress()))
+                 Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
+         }
+         if (!tokensToUpdateInSystemKeyspace.isEmpty())
 -            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
++            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
+     }
 -
      /**
       * Handle node move to normal state. That is, node is entering token ring and participating
       * in reads.
       *
       * @param endpoint node
       */
 -    private void handleStateNormal(final InetAddress endpoint, final String status)
 +    private void handleStateNormal(final InetAddressAndPort endpoint, final String status)
      {
          Collection<Token> tokens = getTokensFor(endpoint);
-         Set<Token> tokensToUpdateInMetadata = new HashSet<>();
-         Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
 -        Set<InetAddress> endpointsToRemove = new HashSet<>();
 +        Set<InetAddressAndPort> endpointsToRemove = new HashSet<>();
  
          if (logger.isDebugEnabled())
              logger.debug("Node {} state {}, token {}", endpoint, status, tokens);
@@@ -2610,13 -2490,8 +2635,8 @@@
       *
       * @param endpoint node
       */
 -    private void handleStateLeaving(InetAddress endpoint)
 +    private void handleStateLeaving(InetAddressAndPort endpoint)
      {
-         Collection<Token> tokens = getTokensFor(endpoint);
- 
-         if (logger.isDebugEnabled())
-             logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
- 
          // If the node is previously unknown or tokens do not match, update tokenmetadata to
          // have this node as 'normal' (it must have been using this token before the
          // leave). This way we'll get pending ranges right.
@@@ -2660,8 -2527,10 +2672,10 @@@
       * @param endpoint moving endpoint address
       * @param pieces STATE_MOVING, token
       */
 -    private void handleStateMoving(InetAddress endpoint, String[] pieces)
 +    private void handleStateMoving(InetAddressAndPort endpoint, String[] pieces)
      {
+         ensureUpToDateTokenMetadata(VersionedValue.STATUS_MOVING, endpoint);
+ 
          assert pieces.length >= 2;
          Token token = getTokenFactory().fromString(pieces[1]);
  
diff --cc test/unit/org/apache/cassandra/service/MoveTest.java
index a7cfc1b,39ebd66..a4da7b8
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@@ -496,14 -480,22 +496,23 @@@ public class MoveTes
      {
          tmd.removeFromMoving(host);
          assertTrue(!tmd.isMoving(host));
-         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host);
+         Token newToken = new BigIntegerToken(String.valueOf(token));
+         tmd.updateNormalToken(newToken, host);
+         // As well as upating TMD, update the host's tokens in gossip. Since CASSANDRA-15120, status changing to MOVING
+         // ensures that TMD is up to date with token assignments according to gossip. So we need to make sure gossip has
+         // the correct new token, as the moving node itself would do upon successful completion of the move operation.
+         // Without this, the next movement for that host will set the token in TMD's back to the old value from gossip
+         // and incorrect range movements will follow
+         Gossiper.instance.injectApplicationState(host,
+                                                  ApplicationState.TOKENS,
+                                                  new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(newToken)));
      }
  
 -    private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
 +    private Map.Entry<Range<Token>, EndpointsForRange> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
      {
 -        Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>();
 -        pendingRanges.put(generateRange(start, end), makeAddrs(endpoints));
 +        Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>();
 +        Range<Token> range = generateRange(start, end);
 +        pendingRanges.put(range, makeReplicas(range, endpoints));
          return pendingRanges.entrySet().iterator().next();
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org