You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/05/01 15:51:33 UTC

git commit: Support consistent range movements.

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 233761e53 -> 9f60c55ba


Support consistent range movements.

patch by tjake; reviewed by thobbs for CASSANDRA-2434


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f60c55b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f60c55b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f60c55b

Branch: refs/heads/cassandra-2.1
Commit: 9f60c55ba42ff56aa58c3790b9c55924c4deedf4
Parents: 233761e
Author: T Jake Luciani <ja...@apache.org>
Authored: Thu May 1 09:47:22 2014 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Thu May 1 09:47:22 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  5 ++
 .../org/apache/cassandra/dht/BootStrapper.java  |  2 +-
 .../org/apache/cassandra/dht/RangeStreamer.java | 81 +++++++++++++++++++-
 .../cassandra/service/StorageService.java       | 44 ++++++++++-
 5 files changed, 127 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 34533cc..be72ad1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,7 @@
  * Optimize cellname comparison (CASSANDRA-6934)
  * Native protocol v3 (CASSANDRA-6855)
  * Optimize Cell liveness checks and clean up Cell (CASSANDRA-7119)
+ * Support consistent range movements (CASSANDRA-2434)
 Merged from 2.0:
  * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072)
  * Set JMX RMI port to 7199 (CASSANDRA-7087)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 86c6f64..5d59460 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -30,6 +30,11 @@ New features
      repair session. Use nodetool repair -par -inc to use this feature.
      A tool to manually mark/unmark sstables as repaired is available in
      tools/bin/sstablerepairedset.
+   - Bootstrapping now ensures that range movements are consistent,
+     meaning the data for the new node is taken from the node that is no 
+     longer a responsible for that range of keys.  
+     If you want the old behavior (due to a lost node perhaps)
+     you can set the following property (-Dconsistent.rangemovement=false)
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 343748b..cbbd100 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -63,7 +63,7 @@ public class BootStrapper
         if (logger.isDebugEnabled())
             logger.debug("Beginning bootstrap process");
 
-        RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, "Bootstrap");
+        RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap");
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
 
         for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 7ab39a4..2308d30 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -23,6 +23,8 @@ import java.util.*;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
@@ -44,7 +47,8 @@ import org.apache.cassandra.utils.FBUtilities;
 public class RangeStreamer
 {
     private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
-
+    public static final boolean useStrictConsistency = Boolean.valueOf(System.getProperty("consistent.rangemovement","true"));
+    private final Collection<Token> tokens;
     private final TokenMetadata metadata;
     private final InetAddress address;
     private final String description;
@@ -99,9 +103,19 @@ public class RangeStreamer
         }
     }
 
+    public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, String description)
+    {
+        this.metadata = metadata;
+        this.tokens = tokens;
+        this.address = address;
+        this.description = description;
+        this.streamPlan = new StreamPlan(description);
+    }
+
     public RangeStreamer(TokenMetadata metadata, InetAddress address, String description)
     {
         this.metadata = metadata;
+        this.tokens = null;
         this.address = address;
         this.description = description;
         this.streamPlan = new StreamPlan(description);
@@ -114,11 +128,12 @@ public class RangeStreamer
 
     public void addRanges(String keyspaceName, Collection<Range<Token>> ranges)
     {
-        Multimap<Range<Token>, InetAddress> rangesForKeyspace = getAllRangesWithSourcesFor(keyspaceName, ranges);
+        Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictConsistency && tokens != null
+                ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName, ranges);
 
         if (logger.isDebugEnabled())
         {
-            for (Map.Entry<Range<Token>, InetAddress> entry: rangesForKeyspace.entries())
+            for (Map.Entry<Range<Token>, InetAddress> entry : rangesForKeyspace.entries())
                 logger.debug(String.format("%s: range %s exists on %s", description, entry.getKey(), entry.getValue()));
         }
 
@@ -163,6 +178,66 @@ public class RangeStreamer
     }
 
     /**
+     * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
+     * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
+     * consistency.
+     */
+    private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String table, Collection<Range<Token>> desiredRanges)
+    {
+
+        assert tokens != null;
+        AbstractReplicationStrategy strat = Keyspace.open(table).getReplicationStrategy();
+
+        //Active ranges
+        TokenMetadata metadataClone = metadata.cloneOnlyTokenMap();
+        Multimap<Range<Token>,InetAddress> addressRanges = strat.getRangeAddresses(metadataClone);
+
+        //Pending ranges
+        metadataClone.updateNormalTokens(tokens, address);
+        Multimap<Range<Token>,InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone);
+
+        //Collects the source that will have its range moved to the new node
+        Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create();
+
+        for (Range<Token> desiredRange : desiredRanges)
+        {
+            for (Map.Entry<Range<Token>, Collection<InetAddress>> preEntry : addressRanges.asMap().entrySet())
+            {
+                if (preEntry.getKey().contains(desiredRange))
+                {
+                    Set<InetAddress> oldEndpoints = Sets.newHashSet(preEntry.getValue());
+                    Set<InetAddress> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange));
+
+                    //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+                    //So we need to be careful to only be strict when endpoints == RF
+                    if (oldEndpoints.size() == strat.getReplicationFactor())
+                    {
+                        oldEndpoints.removeAll(newEndpoints);
+                        assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size();
+                    }
+
+                    rangeSources.put(desiredRange, oldEndpoints.iterator().next());
+                }
+            }
+
+            //Validate
+            Collection<InetAddress> addressList = rangeSources.get(desiredRange);
+            if (addressList == null || addressList.isEmpty())
+                throw new IllegalStateException("No sources found for " + desiredRange);
+
+            if (addressList.size() > 1)
+                throw new IllegalStateException("Multiple endpoints found for " + desiredRange);
+
+            InetAddress sourceIp = addressList.iterator().next();
+            EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp);
+            if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive()))
+                throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+").  If you wish to move the data from a potentially inconsistent replica, restart the node with -Dconsistent.rangemovement=false");
+        }
+
+        return rangeSources;
+    }
+
+    /**
      * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
      * @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given
      *                      here, we always exclude ourselves.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 25a3670..85c080e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3217,7 +3217,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     // getting collection of the currently used ranges by this keyspace
                     Collection<Range<Token>> currentRanges = getRangesForEndpoint(keyspace, localAddress);
                     // collection of ranges which this node will serve after move to the new token
-                    Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadata, newToken, localAddress);
+                    Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
 
                     // ring ranges and endpoints associated with them
                     // this used to determine what nodes should we ping about range data
@@ -3237,11 +3237,51 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         {
                             if (range.contains(toFetch))
                             {
-                                List<InetAddress> endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range));
+                                List<InetAddress> endpoints = null;
+
+                                if (RangeStreamer.useStrictConsistency)
+                                {
+                                    Set<InetAddress> oldEndpoints = Sets.newHashSet(rangeAddresses.get(range));
+                                    Set<InetAddress> newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled));
+
+                                    //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+                                    //So we need to be careful to only be strict when endpoints == RF
+                                    if (oldEndpoints.size() == strategy.getReplicationFactor())
+                                    {
+                                        oldEndpoints.removeAll(newEndpoints);
+
+                                        //No relocation required 
+                                        if (oldEndpoints.isEmpty())
+                                            continue;
+
+                                        assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size();
+                                    }
+
+                                    endpoints = Lists.newArrayList(oldEndpoints.iterator().next());
+                                }
+                                else
+                                {
+                                    endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range));
+                                }
+
                                 // storing range and preferred endpoint set
                                 rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints);
                             }
                         }
+
+                        Collection<InetAddress> addressList = rangesToFetchWithPreferredEndpoints.get(toFetch);
+                        if (addressList == null || addressList.isEmpty())
+                            continue;
+
+                        if (RangeStreamer.useStrictConsistency)
+                        {
+                            if (addressList.size() > 1)
+                                throw new IllegalStateException("Multiple strict sources found for " + toFetch);
+
+                            InetAddress sourceIp = addressList.iterator().next();
+                            if (Gossiper.instance.isEnabled() && !Gossiper.instance.getEndpointStateForEndpoint(sourceIp).isAlive())
+                                throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+").  If you wish to move the data from a potentially inconsistent replica, restart the node with -Dconsistent.rangemovement=false");
+                        }
                     }
 
                     // calculating endpoints to stream current ranges to if needed