You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2012/09/14 17:16:18 UTC

[4/5] git commit: p/4443/020_calculate_pending

p/4443/020_calculate_pending

Wire a collection of relocating tokens into the calculation
of endpoints for pending ranges.

Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4559


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f643a4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f643a4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f643a4a

Branch: refs/heads/trunk
Commit: 9f643a4ae62200d47fbe64f2b44260b767449767
Parents: 1f36519
Author: Eric Evans <ee...@apache.org>
Authored: Fri Sep 14 10:08:53 2012 -0500
Committer: Eric Evans <ee...@apache.org>
Committed: Fri Sep 14 10:11:00 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/locator/TokenMetadata.java    |   91 ++++++++++++++-
 .../apache/cassandra/service/StorageService.java   |   16 +++-
 2 files changed, 105 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f643a4a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index c666c47..0ecb125 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -84,6 +84,8 @@ public class TokenMetadata
     // nodes which are migrating to the new tokens in the ring
     private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>();
 
+    // tokens which are migrating to new endpoints
+    private final Map<Token, InetAddress> relocatingTokens = new HashMap<Token, InetAddress>();
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -347,6 +349,33 @@ public class TokenMetadata
         }
     }
 
+    /**
+     * Add new relocating ranges (tokens moving from their respective endpoints, to another).
+     * @param tokens tokens being moved
+     * @param endpoint destination of moves
+     */
+    public void addRelocatingTokens(Collection<Token> tokens, InetAddress endpoint)
+    {
+        assert endpoint != null;
+        assert tokens != null && tokens.size() > 0;
+
+        lock.writeLock().lock();
+
+        try
+        {
+            for (Token token : tokens)
+            {
+                InetAddress prev = relocatingTokens.put(token, endpoint);
+                if (prev != null && !prev.equals(endpoint))
+                    logger.warn("Relocation of {} to {} overwrites previous to {}", new Object[]{token, endpoint, prev});
+            }
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
     public void removeEndpoint(InetAddress endpoint)
     {
         assert endpoint != null;
@@ -396,6 +425,38 @@ public class TokenMetadata
         }
     }
 
+    /**
+     * Remove pair of token/address from relocating ranges.
+     * @param endpoint
+     */
+    public void removeFromRelocating(Token token, InetAddress endpoint)
+    {
+        assert endpoint != null;
+        assert token != null;
+
+        lock.writeLock().lock();
+
+        try
+        {
+            InetAddress previous = relocatingTokens.remove(token);
+
+            if (previous == null)
+            {
+                logger.debug("Cannot remove {}, not found among the relocating (previously removed?)", token);
+            }
+            else if (!previous.equals(endpoint))
+            {
+                logger.warn(
+                        "Removal of relocating token {} with mismatched endpoint ({} != {})",
+                        new Object[]{token, endpoint, previous});
+            }
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
     public Collection<Token> getTokens(InetAddress endpoint)
     {
         assert endpoint != null;
@@ -470,6 +531,22 @@ public class TokenMetadata
         }
     }
 
+    public boolean isRelocating(Token token)
+    {
+        assert token != null;
+
+        lock.readLock().lock();
+
+        try
+        {
+            return relocatingTokens.containsKey(token);
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+    }
+
     /**
      * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
      * bootstrap tokens and leaving endpoints are not included in the copy.
@@ -513,7 +590,7 @@ public class TokenMetadata
 
     /**
      * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
-     * current leave and move operations have finished.
+     * current leave, move, and relocate operations have finished.
      *
      * @return new token metadata
      */
@@ -532,6 +609,9 @@ public class TokenMetadata
             for (Pair<Token, InetAddress> pair : movingEndpoints)
                 metadata.updateNormalToken(pair.left, pair.right);
 
+            for (Map.Entry<Token, InetAddress> relocating: relocatingTokens.entrySet())
+                metadata.updateNormalToken(relocating.getKey(), relocating.getValue());
+
             return metadata;
         }
         finally
@@ -654,6 +734,15 @@ public class TokenMetadata
         return movingEndpoints;
     }
 
+    /**
+     * Ranges which are migrating to new endpoints.
+     * @return set of token-address pairs of relocating ranges
+     */
+    public Map<Token, InetAddress> getRelocatingRanges()
+    {
+        return relocatingTokens;
+    }
+
     public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
     {
         assert ring.size() > 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f643a4a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 88780dd..07ee2bd 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1525,7 +1525,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
 
         // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes.
-        // We can now finish the calculation by checking moving nodes.
+        // We can now finish the calculation by checking moving and relocating nodes.
 
         // For each of the moving nodes, we do the same thing we did for bootstrapping:
         // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
@@ -1544,6 +1544,20 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             allLeftMetadata.removeEndpoint(endpoint);
         }
 
+        // Ranges being relocated.
+        for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet())
+        {
+            InetAddress endpoint = relocating.getValue(); // address of the moving node
+            Token token = relocating.getKey();
+
+            allLeftMetadata.updateNormalToken(token, endpoint);
+
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+                pendingRanges.put(range, endpoint);
+
+            allLeftMetadata.removeEndpoint(endpoint);
+        }
+
         tm.setPendingRanges(table, pendingRanges);
 
         if (logger.isDebugEnabled())