You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2019/06/03 15:05:06 UTC

[cassandra] branch trunk updated (797ec05 -> cc1bc2a)

This is an automated email from the ASF dual-hosted git repository.

samt pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 797ec05  Merge branch 'cassandra-3.11' into trunk
     new e4b5d98  Update token metadata for non-normal state changes
     new 8db0d84  Merge branch 'cassandra-3.0' into cassandra-3.11
     new cc1bc2a  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/cassandra/service/StorageService.java   | 156 ++++++++++++---------
 .../org/apache/cassandra/service/MoveTest.java     |  11 +-
 2 files changed, 96 insertions(+), 71 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit cc1bc2a8e1bcac8cee08289ca04d6490a95f225e
Merge: 797ec05 8db0d84
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Mon Jun 3 16:02:36 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 .../apache/cassandra/service/StorageService.java   | 156 ++++++++++++---------
 .../org/apache/cassandra/service/MoveTest.java     |  11 +-
 2 files changed, 96 insertions(+), 71 deletions(-)

diff --cc src/java/org/apache/cassandra/service/StorageService.java
index eade7dd,c340db6..2b6bf1d
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2457,18 -2311,95 +2457,94 @@@ public class StorageService extends Not
          tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
      }
  
 -    private void ensureUpToDateTokenMetadata(String status, InetAddress endpoint)
++    private void ensureUpToDateTokenMetadata(String status, InetAddressAndPort endpoint)
+     {
+         Set<Token> tokens = new TreeSet<>(getTokensFor(endpoint));
+ 
+         if (logger.isDebugEnabled())
+             logger.debug("Node {} state {}, tokens {}", endpoint, status, tokens);
+ 
+         // If the node is previously unknown or tokens do not match, update tokenmetadata to
+         // have this node as 'normal' (it must have been using this token before the
+         // leave). This way we'll get pending ranges right.
+         if (!tokenMetadata.isMember(endpoint))
+         {
+             logger.info("Node {} state jump to {}", endpoint, status);
+             updateTokenMetadata(endpoint, tokens);
+         }
+         else if (!tokens.equals(new TreeSet<>(tokenMetadata.getTokens(endpoint))))
+         {
+             logger.warn("Node {} '{}' token mismatch. Long network partition?", endpoint, status);
+             updateTokenMetadata(endpoint, tokens);
+         }
+     }
+ 
 -    private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens)
++    private void updateTokenMetadata(InetAddressAndPort endpoint, Iterable<Token> tokens)
+     {
+         updateTokenMetadata(endpoint, tokens, new HashSet<>());
+     }
+ 
 -    private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens, Set<InetAddress> endpointsToRemove)
++    private void updateTokenMetadata(InetAddressAndPort endpoint, Iterable<Token> tokens, Set<InetAddressAndPort> endpointsToRemove)
+     {
+         Set<Token> tokensToUpdateInMetadata = new HashSet<>();
+         Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
+ 
+         for (final Token token : tokens)
+         {
+             // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
 -            InetAddress currentOwner = tokenMetadata.getEndpoint(token);
++            InetAddressAndPort currentOwner = tokenMetadata.getEndpoint(token);
+             if (currentOwner == null)
+             {
+                 logger.debug("New node {} at token {}", endpoint, token);
+                 tokensToUpdateInMetadata.add(token);
+                 tokensToUpdateInSystemKeyspace.add(token);
+             }
+             else if (endpoint.equals(currentOwner))
+             {
+                 // set state back to normal, since the node may have tried to leave, but failed and is now back up
+                 tokensToUpdateInMetadata.add(token);
+                 tokensToUpdateInSystemKeyspace.add(token);
+             }
+             else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
+             {
+                 tokensToUpdateInMetadata.add(token);
+                 tokensToUpdateInSystemKeyspace.add(token);
+ 
+                 // currentOwner is no longer current, endpoint is.  Keep track of these moves, because when
+                 // a host no longer has any tokens, we'll want to remove it.
 -                Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
++                Multimap<InetAddressAndPort, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+                 epToTokenCopy.get(currentOwner).remove(token);
+                 if (epToTokenCopy.get(currentOwner).isEmpty())
+                     endpointsToRemove.add(currentOwner);
+ 
+                 logger.info("Nodes {} and {} have the same token {}. {} is the new owner", endpoint, currentOwner, token, endpoint);
+             }
+             else
+             {
+                 logger.info("Nodes () and {} have the same token {}.  Ignoring {}", endpoint, currentOwner, token, endpoint);
+             }
+         }
+ 
+         tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
 -        for (InetAddress ep : endpointsToRemove)
++        for (InetAddressAndPort ep : endpointsToRemove)
+         {
+             removeEndpoint(ep);
+             if (replacing && ep.equals(DatabaseDescriptor.getReplaceAddress()))
+                 Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
+         }
+         if (!tokensToUpdateInSystemKeyspace.isEmpty())
 -            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
++            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
+     }
 -
      /**
       * Handle node move to normal state. That is, node is entering token ring and participating
       * in reads.
       *
       * @param endpoint node
       */
 -    private void handleStateNormal(final InetAddress endpoint, final String status)
 +    private void handleStateNormal(final InetAddressAndPort endpoint, final String status)
      {
          Collection<Token> tokens = getTokensFor(endpoint);
-         Set<Token> tokensToUpdateInMetadata = new HashSet<>();
-         Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
 -        Set<InetAddress> endpointsToRemove = new HashSet<>();
 +        Set<InetAddressAndPort> endpointsToRemove = new HashSet<>();
  
          if (logger.isDebugEnabled())
              logger.debug("Node {} state {}, token {}", endpoint, status, tokens);
@@@ -2610,13 -2490,8 +2635,8 @@@
       *
       * @param endpoint node
       */
 -    private void handleStateLeaving(InetAddress endpoint)
 +    private void handleStateLeaving(InetAddressAndPort endpoint)
      {
-         Collection<Token> tokens = getTokensFor(endpoint);
- 
-         if (logger.isDebugEnabled())
-             logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
- 
          // If the node is previously unknown or tokens do not match, update tokenmetadata to
          // have this node as 'normal' (it must have been using this token before the
          // leave). This way we'll get pending ranges right.
@@@ -2660,8 -2527,10 +2672,10 @@@
       * @param endpoint moving endpoint address
       * @param pieces STATE_MOVING, token
       */
 -    private void handleStateMoving(InetAddress endpoint, String[] pieces)
 +    private void handleStateMoving(InetAddressAndPort endpoint, String[] pieces)
      {
+         ensureUpToDateTokenMetadata(VersionedValue.STATUS_MOVING, endpoint);
+ 
          assert pieces.length >= 2;
          Token token = getTokenFactory().fromString(pieces[1]);
  
diff --cc test/unit/org/apache/cassandra/service/MoveTest.java
index a7cfc1b,39ebd66..a4da7b8
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@@ -496,14 -480,22 +496,23 @@@ public class MoveTes
      {
          tmd.removeFromMoving(host);
          assertTrue(!tmd.isMoving(host));
-         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host);
+         Token newToken = new BigIntegerToken(String.valueOf(token));
+         tmd.updateNormalToken(newToken, host);
+         // As well as upating TMD, update the host's tokens in gossip. Since CASSANDRA-15120, status changing to MOVING
+         // ensures that TMD is up to date with token assignments according to gossip. So we need to make sure gossip has
+         // the correct new token, as the moving node itself would do upon successful completion of the move operation.
+         // Without this, the next movement for that host will set the token in TMD's back to the old value from gossip
+         // and incorrect range movements will follow
+         Gossiper.instance.injectApplicationState(host,
+                                                  ApplicationState.TOKENS,
+                                                  new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(newToken)));
      }
  
 -    private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
 +    private Map.Entry<Range<Token>, EndpointsForRange> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
      {
 -        Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>();
 -        pendingRanges.put(generateRange(start, end), makeAddrs(endpoints));
 +        Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>();
 +        Range<Token> range = generateRange(start, end);
 +        pendingRanges.put(range, makeReplicas(range, endpoints));
          return pendingRanges.entrySet().iterator().next();
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org