You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/05/01 15:51:33 UTC
git commit: Support consistent range movements.
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 233761e53 -> 9f60c55ba
Support consistent range movements.
patch by tjake; reviewed by thobbs for CASSANDRA-2434
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f60c55b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f60c55b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f60c55b
Branch: refs/heads/cassandra-2.1
Commit: 9f60c55ba42ff56aa58c3790b9c55924c4deedf4
Parents: 233761e
Author: T Jake Luciani <ja...@apache.org>
Authored: Thu May 1 09:47:22 2014 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Thu May 1 09:47:22 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 5 ++
.../org/apache/cassandra/dht/BootStrapper.java | 2 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 81 +++++++++++++++++++-
.../cassandra/service/StorageService.java | 44 ++++++++++-
5 files changed, 127 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 34533cc..be72ad1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,7 @@
* Optimize cellname comparison (CASSANDRA-6934)
* Native protocol v3 (CASSANDRA-6855)
* Optimize Cell liveness checks and clean up Cell (CASSANDRA-7119)
+ * Support consistent range movements (CASSANDRA-2434)
Merged from 2.0:
* Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072)
* Set JMX RMI port to 7199 (CASSANDRA-7087)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 86c6f64..5d59460 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -30,6 +30,11 @@ New features
repair session. Use nodetool repair -par -inc to use this feature.
A tool to manually mark/unmark sstables as repaired is available in
tools/bin/sstablerepairedset.
+ - Bootstrapping now ensures that range movements are consistent,
+ meaning the data for the new node is taken from the node that is no
+ longer a responsible for that range of keys.
+ If you want the old behavior (due to a lost node perhaps)
+ you can set the following property (-Dconsistent.rangemovement=false)
Upgrading
---------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 343748b..cbbd100 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -63,7 +63,7 @@ public class BootStrapper
if (logger.isDebugEnabled())
logger.debug("Beginning bootstrap process");
- RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, "Bootstrap");
+ RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap");
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 7ab39a4..2308d30 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -23,6 +23,8 @@ import java.util.*;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.gms.EndpointState;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
@@ -44,7 +47,8 @@ import org.apache.cassandra.utils.FBUtilities;
public class RangeStreamer
{
private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
-
+ public static final boolean useStrictConsistency = Boolean.valueOf(System.getProperty("consistent.rangemovement","true"));
+ private final Collection<Token> tokens;
private final TokenMetadata metadata;
private final InetAddress address;
private final String description;
@@ -99,9 +103,19 @@ public class RangeStreamer
}
}
+ public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, String description)
+ {
+ this.metadata = metadata;
+ this.tokens = tokens;
+ this.address = address;
+ this.description = description;
+ this.streamPlan = new StreamPlan(description);
+ }
+
public RangeStreamer(TokenMetadata metadata, InetAddress address, String description)
{
this.metadata = metadata;
+ this.tokens = null;
this.address = address;
this.description = description;
this.streamPlan = new StreamPlan(description);
@@ -114,11 +128,12 @@ public class RangeStreamer
public void addRanges(String keyspaceName, Collection<Range<Token>> ranges)
{
- Multimap<Range<Token>, InetAddress> rangesForKeyspace = getAllRangesWithSourcesFor(keyspaceName, ranges);
+ Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictConsistency && tokens != null
+ ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName, ranges);
if (logger.isDebugEnabled())
{
- for (Map.Entry<Range<Token>, InetAddress> entry: rangesForKeyspace.entries())
+ for (Map.Entry<Range<Token>, InetAddress> entry : rangesForKeyspace.entries())
logger.debug(String.format("%s: range %s exists on %s", description, entry.getKey(), entry.getValue()));
}
@@ -163,6 +178,66 @@ public class RangeStreamer
}
/**
+ * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
+ * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
+ * consistency.
+ */
+ private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String table, Collection<Range<Token>> desiredRanges)
+ {
+
+ assert tokens != null;
+ AbstractReplicationStrategy strat = Keyspace.open(table).getReplicationStrategy();
+
+ //Active ranges
+ TokenMetadata metadataClone = metadata.cloneOnlyTokenMap();
+ Multimap<Range<Token>,InetAddress> addressRanges = strat.getRangeAddresses(metadataClone);
+
+ //Pending ranges
+ metadataClone.updateNormalTokens(tokens, address);
+ Multimap<Range<Token>,InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone);
+
+ //Collects the source that will have its range moved to the new node
+ Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create();
+
+ for (Range<Token> desiredRange : desiredRanges)
+ {
+ for (Map.Entry<Range<Token>, Collection<InetAddress>> preEntry : addressRanges.asMap().entrySet())
+ {
+ if (preEntry.getKey().contains(desiredRange))
+ {
+ Set<InetAddress> oldEndpoints = Sets.newHashSet(preEntry.getValue());
+ Set<InetAddress> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange));
+
+ //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+ //So we need to be careful to only be strict when endpoints == RF
+ if (oldEndpoints.size() == strat.getReplicationFactor())
+ {
+ oldEndpoints.removeAll(newEndpoints);
+ assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size();
+ }
+
+ rangeSources.put(desiredRange, oldEndpoints.iterator().next());
+ }
+ }
+
+ //Validate
+ Collection<InetAddress> addressList = rangeSources.get(desiredRange);
+ if (addressList == null || addressList.isEmpty())
+ throw new IllegalStateException("No sources found for " + desiredRange);
+
+ if (addressList.size() > 1)
+ throw new IllegalStateException("Multiple endpoints found for " + desiredRange);
+
+ InetAddress sourceIp = addressList.iterator().next();
+ EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp);
+ if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive()))
+ throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dconsistent.rangemovement=false");
+ }
+
+ return rangeSources;
+ }
+
+ /**
* @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
* @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given
* here, we always exclude ourselves.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 25a3670..85c080e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3217,7 +3217,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// getting collection of the currently used ranges by this keyspace
Collection<Range<Token>> currentRanges = getRangesForEndpoint(keyspace, localAddress);
// collection of ranges which this node will serve after move to the new token
- Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadata, newToken, localAddress);
+ Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
// ring ranges and endpoints associated with them
// this used to determine what nodes should we ping about range data
@@ -3237,11 +3237,51 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (range.contains(toFetch))
{
- List<InetAddress> endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range));
+ List<InetAddress> endpoints = null;
+
+ if (RangeStreamer.useStrictConsistency)
+ {
+ Set<InetAddress> oldEndpoints = Sets.newHashSet(rangeAddresses.get(range));
+ Set<InetAddress> newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled));
+
+ //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+ //So we need to be careful to only be strict when endpoints == RF
+ if (oldEndpoints.size() == strategy.getReplicationFactor())
+ {
+ oldEndpoints.removeAll(newEndpoints);
+
+ //No relocation required
+ if (oldEndpoints.isEmpty())
+ continue;
+
+ assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size();
+ }
+
+ endpoints = Lists.newArrayList(oldEndpoints.iterator().next());
+ }
+ else
+ {
+ endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range));
+ }
+
// storing range and preferred endpoint set
rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints);
}
}
+
+ Collection<InetAddress> addressList = rangesToFetchWithPreferredEndpoints.get(toFetch);
+ if (addressList == null || addressList.isEmpty())
+ continue;
+
+ if (RangeStreamer.useStrictConsistency)
+ {
+ if (addressList.size() > 1)
+ throw new IllegalStateException("Multiple strict sources found for " + toFetch);
+
+ InetAddress sourceIp = addressList.iterator().next();
+ if (Gossiper.instance.isEnabled() && !Gossiper.instance.getEndpointStateForEndpoint(sourceIp).isAlive())
+ throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dconsistent.rangemovement=false");
+ }
}
// calculating endpoints to stream current ranges to if needed