You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/26 09:43:16 UTC
[2/2] cassandra git commit: Transient replication: range movement
improvements
Transient replication: range movement improvements
Patch by Alex Petrov; reviewed by Ariel Weisberg and Benedict Elliott Smith for CASSANDRA-14756
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0379201c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0379201c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0379201c
Branch: refs/heads/trunk
Commit: 0379201c7057f6bac4abf1e0f3d81a12d90abd08
Parents: 210da3d
Author: Alex Petrov <ol...@gmail.com>
Authored: Mon Sep 17 11:51:56 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Sep 26 11:42:46 2018 +0200
----------------------------------------------------------------------
.../org/apache/cassandra/db/SystemKeyspace.java | 31 +-
.../org/apache/cassandra/dht/BootStrapper.java | 3 -
.../cassandra/dht/RangeFetchMapCalculator.java | 2 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 448 ++++++++++---------
.../apache/cassandra/dht/StreamStateStore.java | 12 +-
.../cassandra/locator/RangesAtEndpoint.java | 6 +
.../cassandra/service/RangeRelocator.java | 324 ++++++++++++++
.../cassandra/service/StorageService.java | 314 +------------
.../apache/cassandra/streaming/StreamPlan.java | 17 +-
.../cassandra/streaming/StreamSession.java | 8 +-
.../apache/cassandra/dht/BootStrapperTest.java | 17 +-
.../dht/RangeFetchMapCalculatorTest.java | 79 +++-
.../locator/OldNetworkTopologyStrategyTest.java | 3 +-
.../service/BootstrapTransientTest.java | 113 +++--
.../cassandra/service/MoveTransientTest.java | 321 +++++++------
.../cassandra/service/StorageServiceTest.java | 18 +-
16 files changed, 981 insertions(+), 735 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index ff070a3..0f904ce 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -32,12 +32,11 @@ import javax.management.openmbean.TabularData;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1285,24 +1284,40 @@ public final class SystemKeyspace
keyspace);
}
- public static synchronized RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
+ /**
+ * List of the streamed ranges, where transientness is encoded based on the source, where range was streamed from.
+ */
+ public static synchronized AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner)
{
String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES_V2), keyspace);
- InetAddressAndPort endpoint = InetAddressAndPort.getLocalHost();
- RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
+
+ ImmutableSet.Builder<Range<Token>> full = new ImmutableSet.Builder<>();
+ ImmutableSet.Builder<Range<Token>> trans = new ImmutableSet.Builder<>();
for (UntypedResultSet.Row row : rs)
{
Optional.ofNullable(row.getSet("full_ranges", BytesType.instance))
.ifPresent(full_ranges -> full_ranges.stream()
.map(buf -> byteBufferToRange(buf, partitioner))
- .forEach(range -> builder.add(fullReplica(endpoint, range))));
+ .forEach(full::add));
Optional.ofNullable(row.getSet("transient_ranges", BytesType.instance))
.ifPresent(transient_ranges -> transient_ranges.stream()
.map(buf -> byteBufferToRange(buf, partitioner))
- .forEach(range -> builder.add(transientReplica(endpoint, range))));
+ .forEach(trans::add));
+ }
+ return new AvailableRanges(full.build(), trans.build());
+ }
+
+ public static class AvailableRanges
+ {
+ public Set<Range<Token>> full;
+ public Set<Range<Token>> trans;
+
+ private AvailableRanges(Set<Range<Token>> full, Set<Range<Token>> trans)
+ {
+ this.full = full;
+ this.trans = trans;
}
- return builder.build();
}
public static void resetAvailableRanges()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 92bf8c8..cef605e 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -79,9 +79,6 @@ public class BootStrapper extends ProgressEventNotifierSupport
stateStore,
true,
DatabaseDescriptor.getStreamingConnectionsPerHost());
- streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
- streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
-
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
{
AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index 4b98b97..63265b7 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -84,7 +84,7 @@ public class RangeFetchMapCalculator
private final Set<Range<Token>> trivialRanges;
public RangeFetchMapCalculator(EndpointsByRange rangesWithSources,
- Collection<Predicate<Replica>> sourceFilters,
+ Collection<RangeStreamer.SourceFilter> sourceFilters,
String keyspace)
{
this.rangesWithSources = rangesWithSources;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index e8aa5d3..f46d665 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -27,9 +27,9 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsByReplica;
@@ -53,12 +53,12 @@ import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
@@ -87,8 +87,8 @@ public class RangeStreamer
private final InetAddressAndPort address;
/* streaming description */
private final String description;
- private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = HashMultimap.create();
- private final Set<Predicate<Replica>> sourceFilters = new HashSet<>();
+ private final Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = new HashMap<>();
+ private final List<SourceFilter> sourceFilters = new ArrayList<>();
private final StreamPlan streamPlan;
private final boolean useStrictConsistency;
private final IEndpointSnitch snitch;
@@ -97,6 +97,7 @@ public class RangeStreamer
public static class FetchReplica
{
public final Replica local;
+ // Source replica
public final Replica remote;
public FetchReplica(Replica local, Replica remote)
@@ -135,11 +136,17 @@ public class RangeStreamer
}
}
+ public interface SourceFilter extends Predicate<Replica>
+ {
+ public boolean apply(Replica replica);
+ public String message(Replica replica);
+ }
+
/**
* Source filter which excludes any endpoints that are not alive according to a
* failure detector.
*/
- public static class FailureDetectorSourceFilter implements Predicate<Replica>
+ public static class FailureDetectorSourceFilter implements SourceFilter
{
private final IFailureDetector fd;
@@ -148,16 +155,23 @@ public class RangeStreamer
this.fd = fd;
}
+ @Override
public boolean apply(Replica replica)
{
return fd.isAlive(replica.endpoint());
}
+
+ @Override
+ public String message(Replica replica)
+ {
+ return "Filtered " + replica + " out because it was down";
+ }
}
/**
* Source filter which excludes any endpoints that are not in a specific data center.
*/
- public static class SingleDatacenterFilter implements Predicate<Replica>
+ public static class SingleDatacenterFilter implements SourceFilter
{
private final String sourceDc;
private final IEndpointSnitch snitch;
@@ -168,27 +182,41 @@ public class RangeStreamer
this.snitch = snitch;
}
+ @Override
public boolean apply(Replica replica)
{
return snitch.getDatacenter(replica).equals(sourceDc);
}
+
+ @Override
+ public String message(Replica replica)
+ {
+ return "Filtered " + replica + " out because it does not belong to " + sourceDc + " datacenter";
+ }
}
/**
* Source filter which excludes the current node from source calculations
*/
- public static class ExcludeLocalNodeFilter implements Predicate<Replica>
+ public static class ExcludeLocalNodeFilter implements SourceFilter
{
+ @Override
public boolean apply(Replica replica)
{
return !replica.isLocal();
}
+
+ @Override
+ public String message(Replica replica)
+ {
+ return "Filtered " + replica + " out because it is local";
+ }
}
/**
* Source filter which only includes endpoints contained within a provided set.
*/
- public static class WhitelistedSourcesFilter implements Predicate<Replica>
+ public static class WhitelistedSourcesFilter implements SourceFilter
{
private final Set<InetAddressAndPort> whitelistedSources;
@@ -201,6 +229,12 @@ public class RangeStreamer
{
return whitelistedSources.contains(replica.endpoint());
}
+
+ @Override
+ public String message(Replica replica)
+ {
+ return "Filtered " + replica + " out because it was not whitelisted, whitelisted sources: " + whitelistedSources;
+ }
}
public RangeStreamer(TokenMetadata metadata,
@@ -213,6 +247,21 @@ public class RangeStreamer
boolean connectSequentially,
int connectionsPerHost)
{
+ this(metadata, tokens, address, streamOperation, useStrictConsistency, snitch, stateStore,
+ FailureDetector.instance, connectSequentially, connectionsPerHost);
+ }
+
+ RangeStreamer(TokenMetadata metadata,
+ Collection<Token> tokens,
+ InetAddressAndPort address,
+ StreamOperation streamOperation,
+ boolean useStrictConsistency,
+ IEndpointSnitch snitch,
+ StreamStateStore stateStore,
+ IFailureDetector failureDetector,
+ boolean connectSequentially,
+ int connectionsPerHost)
+ {
Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation);
this.metadata = metadata;
this.tokens = tokens;
@@ -223,13 +272,34 @@ public class RangeStreamer
this.snitch = snitch;
this.stateStore = stateStore;
streamPlan.listeners(this.stateStore);
+
+ // We're _always_ filtering out a local node and down sources
+ addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(failureDetector));
+ addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
}
- public void addSourceFilter(Predicate<Replica> filter)
+ public void addSourceFilter(SourceFilter filter)
{
sourceFilters.add(filter);
}
+ // Creates error message from source filters
+ private static String buildErrorMessage(Collection<SourceFilter> sourceFilters, ReplicaCollection<?> replicas)
+ {
+ StringBuilder failureMessage = new StringBuilder();
+ for (Replica r : replicas)
+ {
+ for (SourceFilter filter : sourceFilters)
+ {
+ if (!filter.apply(r))
+ {
+ failureMessage.append(filter.message(r));
+ break;
+ }
+ }
+ }
+ return failureMessage.toString();
+ }
/**
* Add ranges to be streamed for given keyspace.
*
@@ -252,7 +322,6 @@ public class RangeStreamer
for (Map.Entry<Replica, Replica> entry : fetchMap.flattenEntries())
logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName);
-
Multimap<InetAddressAndPort, FetchReplica> workMap;
//Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no
//transient replicas.
@@ -265,10 +334,12 @@ public class RangeStreamer
workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName);
}
- toFetch.put(keyspaceName, workMap);
- for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
+ if (toFetch.put(keyspaceName, workMap) != null)
+ throw new IllegalArgumentException("Keyspace is already added to fetch map");
+
+ if (logger.isTraceEnabled())
{
- if (logger.isTraceEnabled())
+ for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
{
for (FetchReplica r : entry.getValue())
logger.trace("{}: range source {} local range {} for keyspace {}", description, r.remote, r.local, keyspaceName);
@@ -289,10 +360,6 @@ public class RangeStreamer
/**
* Wrapper method to assemble the arguments for invoking the implementation with RangeStreamer's parameters
- * @param fetchRanges
- * @param keyspace
- * @param useStrictConsistency
- * @return
*/
private EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, Keyspace keyspace, boolean useStrictConsistency)
{
@@ -305,7 +372,7 @@ public class RangeStreamer
if (tokens != null)
{
// Pending ranges
- tmdAfter = tmd.cloneOnlyTokenMap();
+ tmdAfter = tmd.cloneOnlyTokenMap();
tmdAfter.updateNormalTokens(tokens, address);
}
else if (useStrictConsistency)
@@ -313,15 +380,14 @@ public class RangeStreamer
throw new IllegalArgumentException("Can't ask for strict consistency and not supply tokens");
}
- return RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
- strat,
- fetchRanges,
- useStrictConsistency,
- tmd,
- tmdAfter,
- ALIVE_PREDICATE,
- keyspace.getName(),
- sourceFilters);
+ return calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
+ strat,
+ fetchRanges,
+ useStrictConsistency,
+ tmd,
+ tmdAfter,
+ keyspace.getName(),
+ sourceFilters);
}
@@ -329,7 +395,6 @@ public class RangeStreamer
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
- *
**/
public static EndpointsByReplica
calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity,
@@ -338,165 +403,148 @@ public class RangeStreamer
boolean useStrictConsistency,
TokenMetadata tmdBefore,
TokenMetadata tmdAfter,
- Predicate<Replica> isAlive,
String keyspace,
- Collection<Predicate<Replica>> sourceFilters)
- {
- EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
-
- InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
- logger.debug ("Keyspace: {}", keyspace);
- logger.debug("To fetch RN: {}", fetchRanges);
- logger.debug("Fetch ranges: {}", rangeAddresses);
-
- Predicate<Replica> testSourceFilters = and(sourceFilters);
- Function<EndpointsForRange, EndpointsForRange> sorted =
- endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
-
- //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
- EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
- for (Replica toFetch : fetchRanges)
- {
- //Replica that is sufficient to provide the data we need
- //With strict consistency and transient replication we may end up with multiple types
- //so this isn't used with strict consistency
- Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
- Predicate<Replica> accept = r ->
- isSufficient.test(r) // is sufficient
- && !r.endpoint().equals(localAddress) // is not self
- && isAlive.test(r); // is alive
-
- logger.debug("To fetch {}", toFetch);
- for (Range<Token> range : rangeAddresses.keySet())
- {
- if (range.contains(toFetch.range()))
- {
- EndpointsForRange oldEndpoints = rangeAddresses.get(range);
-
- //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
- //It could be multiple endpoints and we must fetch from all of them if they are there
- //With transient replication and strict consistency this is to get the full data from a full replica and
- //transient data from the transient replica losing data
- EndpointsForRange sources;
- if (useStrictConsistency)
- {
- //Start with two sets of who replicates the range before and who replicates it after
- EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
- logger.debug("Old endpoints {}", oldEndpoints);
- logger.debug("New endpoints {}", newEndpoints);
-
- //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
- //So we need to be careful to only be strict when endpoints == RF
- if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
- {
- Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
- // Remove new endpoints from old endpoints based on address
- oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
-
- if (!all(oldEndpoints, isAlive))
- throw new IllegalStateException("A node required to move the data consistently is down: "
- + oldEndpoints.filter(not(isAlive)));
-
- if (oldEndpoints.size() > 1)
- throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
-
- //If we are transitioning from transient to full and and the set of replicas for the range is not changing
- //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
- //since we are already a transient replica and the existing replica remains.
- //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
- //So it's an error if we don't find what we need.
- if (oldEndpoints.isEmpty() && toFetch.isTransient())
- {
- throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
- }
-
- if (!any(oldEndpoints, isSufficient))
- {
- // need an additional replica
- EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
- // include all our filters, to ensure we include a matching node
- Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
- if (fullReplica.isPresent())
- oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
- else
- throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
- }
-
- //We have to check the source filters here to see if they will remove any replicas
- //required for strict consistency
- if (!all(oldEndpoints, testSourceFilters))
- throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
- }
- else
- {
- oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
- }
-
- //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
- sources = oldEndpoints.filter(testSourceFilters);
- }
- else
- {
- //Without strict consistency we have given up on correctness so no point in fetching from
- //a random full + transient replica since it's also likely to lose data
- //Also apply testSourceFilters that were given to us so we can safely select a single source
- sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
- //Limit it to just the first possible source, we don't need more than one and downstream
- //will fetch from every source we supply
- sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
- }
-
- // storing range and preferred endpoint set
- rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
- logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
- }
- }
-
- EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
- if (addressList == null)
- throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
-
- /*
- * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
- * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
- * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
- * For a transient range we only need to fetch from one.
- */
- if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
- throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
-
- //We must have enough stuff to fetch from
- if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
- {
- if (strat.getReplicationFactor().allReplicas == 1)
- {
- if (useStrictConsistency)
- {
- logger.warn("A node required to move the data consistently is down");
- throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
- "Ensure this keyspace contains replicas in the source datacenter.");
- }
- else
- logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
- "Keyspace might be missing data.", toFetch, keyspace);
-
- }
- else
- {
- if (useStrictConsistency)
- logger.warn("A node required to move the data consistently is down");
- throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
- }
- }
- }
- return rangesToFetchWithPreferredEndpoints.asImmutableView();
- }
+ Collection<SourceFilter> sourceFilters)
+ {
+ EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
+
+ InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+ logger.debug ("Keyspace: {}", keyspace);
+ logger.debug("To fetch RN: {}", fetchRanges);
+ logger.debug("Fetch ranges: {}", rangeAddresses);
+
+ Predicate<Replica> testSourceFilters = and(sourceFilters);
+ Function<EndpointsForRange, EndpointsForRange> sorted =
+ endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
+
+ //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
+ EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
+ for (Replica toFetch : fetchRanges)
+ {
+ //Replica that is sufficient to provide the data we need
+ //With strict consistency and transient replication we may end up with multiple types
+ //so this isn't used with strict consistency
+ Predicate<Replica> isSufficient = r -> toFetch.isTransient() || r.isFull();
+
+ logger.debug("To fetch {}", toFetch);
+ for (Range<Token> range : rangeAddresses.keySet())
+ {
+ if (!range.contains(toFetch.range()))
+ continue;
+
+ final EndpointsForRange oldEndpoints = sorted.apply(rangeAddresses.get(range));
+
+ //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
+ //It could be multiple endpoints and we must fetch from all of them if they are there
+ //With transient replication and strict consistency this is to get the full data from a full replica and
+ //transient data from the transient replica losing data
+ EndpointsForRange sources;
+ if (useStrictConsistency)
+ {
+ EndpointsForRange strictEndpoints;
+ //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+ //So we need to be careful to only be strict when endpoints == RF
+ if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
+ {
+ //Start with two sets of who replicates the range before and who replicates it after
+ EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
+ logger.debug("Old endpoints {}", oldEndpoints);
+ logger.debug("New endpoints {}", newEndpoints);
+
+ // Remove new endpoints from old endpoints based on address
+ strictEndpoints = oldEndpoints.without(newEndpoints.endpoints());
+
+ if (strictEndpoints.size() > 1)
+ throw new AssertionError("Expected <= 1 endpoint but found " + strictEndpoints);
+
+ //We have to check the source filters here to see if they will remove any replicas
+ //required for strict consistency
+ if (!all(strictEndpoints, testSourceFilters))
+ throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints));
+
+ //If we are transitioning from transient to full and and the set of replicas for the range is not changing
+ //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
+ //since we are already a transient replica and the existing replica remains.
+ //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
+ //So it's an error if we don't find what we need.
+ if (strictEndpoints.isEmpty() && toFetch.isTransient())
+ throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
+
+ if (!any(strictEndpoints, isSufficient))
+ {
+ // need an additional replica; include all our filters, to ensure we include a matching node
+ Optional<Replica> fullReplica = Iterables.<Replica>tryFind(oldEndpoints, and(isSufficient, testSourceFilters)).toJavaUtil();
+ if (fullReplica.isPresent())
+ strictEndpoints = Endpoints.concat(strictEndpoints, EndpointsForRange.of(fullReplica.get()));
+ else
+ throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + buildErrorMessage(sourceFilters, oldEndpoints));
+ }
+ }
+ else
+ {
+ strictEndpoints = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
+ }
+
+ sources = strictEndpoints;
+ }
+ else
+ {
+ //Without strict consistency we have given up on correctness so no point in fetching from
+ //a random full + transient replica since it's also likely to lose data
+ //Also apply testSourceFilters that were given to us so we can safely select a single source
+ sources = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
+ //Limit it to just the first possible source, we don't need more than one and downstream
+ //will fetch from every source we supply
+ sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
+ }
+
+ // storing range and preferred endpoint set
+ rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
+ logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
+ }
+
+ EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
+ if (addressList == null)
+ throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
+
+ /*
+ * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
+ * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
+ * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
+ * For a transient range we only need to fetch from one.
+ */
+ if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
+ throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
+
+ //We must have enough stuff to fetch from
+ if (!any(addressList, isSufficient))
+ {
+ if (strat.getReplicationFactor().allReplicas == 1)
+ {
+ if (useStrictConsistency)
+ {
+ logger.warn("A node required to move the data consistently is down");
+ throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
+ "Ensure this keyspace contains replicas in the source datacenter.");
+ }
+ else
+ logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
+ "Keyspace might be missing data.", toFetch, keyspace);
+ }
+ else
+ {
+ if (useStrictConsistency)
+ logger.warn("A node required to move the data consistently is down");
+ throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
+ }
+ }
+ }
+ return rangesToFetchWithPreferredEndpoints.asImmutableView();
+ }
/**
* The preferred endpoint list is the wrong format because it is keyed by Replica (this node) rather than the source
* endpoint we will fetch from which streaming wants.
- * @param preferredEndpoints
- * @return
*/
public static Multimap<InetAddressAndPort, FetchReplica> convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints)
{
@@ -505,7 +553,7 @@ public class RangeStreamer
{
for (Replica source : e.getValue())
{
- assert (e.getKey()).isLocal();
+ assert e.getKey().isLocal();
assert !source.isLocal();
workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source));
}
@@ -518,7 +566,8 @@ public class RangeStreamer
* Optimized version that also outputs the final work map
*/
private static Multimap<InetAddressAndPort, FetchReplica> getOptimizedWorkMap(EndpointsByReplica rangesWithSources,
- Collection<Predicate<Replica>> sourceFilters, String keyspace)
+ Collection<SourceFilter> sourceFilters,
+ String keyspace)
{
//For now we just aren't going to use the optimized range fetch map with transient replication to shrink
//the surface area to test and introduce bugs.
@@ -531,10 +580,11 @@ public class RangeStreamer
unwrapped.put(entry.getKey().range(), entry.getValue());
}
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrapped.asImmutableView(), sourceFilters, keyspace);
+ EndpointsByRange unwrappedView = unwrapped.asImmutableView();
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrappedView, sourceFilters, keyspace);
Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap();
logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace);
- validateRangeFetchMap(unwrapped.asImmutableView(), rangeFetchMapMap, keyspace);
+ validateRangeFetchMap(unwrappedView, rangeFetchMapMap, keyspace);
//Need to rewrap as Replicas
Multimap<InetAddressAndPort, FetchReplica> wrapped = HashMultimap.create();
@@ -562,9 +612,6 @@ public class RangeStreamer
/**
* Verify that source returned for each range is correct
- * @param rangesWithSources
- * @param rangeFetchMapMap
- * @param keyspace
*/
private static void validateRangeFetchMap(EndpointsByRange rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace)
{
@@ -588,7 +635,7 @@ public class RangeStreamer
// For testing purposes
@VisibleForTesting
- Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
+ Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
{
return toFetch;
}
@@ -600,16 +647,19 @@ public class RangeStreamer
sources.asMap().forEach((source, fetchReplicas) -> {
// filter out already streamed ranges
- RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
+ SystemKeyspace.AvailableRanges available = stateStore.getAvailableRanges(keyspace, metadata.partitioner);
Predicate<FetchReplica> isAvailable = fetch -> {
- Replica availableRange = available.byRange().get(fetch.local.range());
- if (availableRange == null)
+ boolean isInFull = available.full.contains(fetch.local.range());
+ boolean isInTrans = available.trans.contains(fetch.local.range());
+
+ if (!isInFull && !isInTrans)
//Range is unavailable
return false;
+
if (fetch.local.isFull())
//For full, pick only replicas with matching transientness
- return availableRange.isFull() == fetch.remote.isFull();
+ return isInFull == fetch.remote.isFull();
// Any transient or full will do
return true;
@@ -617,22 +667,16 @@ public class RangeStreamer
List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList());
- if (remaining.size() < available.size())
+ if (remaining.size() < available.full.size() + available.trans.size())
{
List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}",
- fetchReplicas, skipped, available.filter(Replica::isFull).ranges(), available.filter(Replica::isTransient).ranges());
+ fetchReplicas, skipped, available.full, available.trans);
}
if (logger.isTraceEnabled())
logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", "));
- //At the other end the distinction between full and transient is ignored it just used the transient status
- //of the Replica objects we send to determine what to send. The real reason we have this split down to
- //StreamRequest is that on completion StreamRequest is used to write to the system table tracking
- //what has already been streamed. At that point since we only have the local Replica instances so we don't
- //know what we got from the remote. We preserve that here by splitting based on the remotes transient
- //status.
InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
RangesAtEndpoint full = remaining.stream()
.filter(pair -> pair.remote.isFull())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/StreamStateStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java
index 3144e81..e62bc04 100644
--- a/src/java/org/apache/cassandra/dht/StreamStateStore.java
+++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java
@@ -20,7 +20,8 @@ package org.apache.cassandra.dht;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.cassandra.locator.RangesAtEndpoint;
+import com.google.common.collect.Streams;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ public class StreamStateStore implements StreamEventHandler
{
private static final Logger logger = LoggerFactory.getLogger(StreamStateStore.class);
- public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
+ public SystemKeyspace.AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner)
{
return SystemKeyspace.getAvailableRanges(keyspace, partitioner);
}
@@ -54,8 +55,11 @@ public class StreamStateStore implements StreamEventHandler
@VisibleForTesting
public boolean isDataAvailable(String keyspace, Token token)
{
- RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
- return availableRanges.ranges().stream().anyMatch(range -> range.contains(token));
+ SystemKeyspace.AvailableRanges availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
+
+ return Streams.concat(availableRanges.full.stream(),
+ availableRanges.trans.stream())
+ .anyMatch(range -> range.contains(token));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 1773173..f57c28e 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.stream.Collector;
import java.util.stream.Collectors;
+import static com.google.common.collect.Iterables.all;
import static org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict.*;
/**
@@ -302,6 +303,11 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint
.collect(collector(dummy));
}
+ public static boolean isDummyList(RangesAtEndpoint ranges)
+ {
+ return all(ranges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0"));
+ }
+
/**
* @return concatenate two DISJOINT collections together
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/service/RangeRelocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java
new file mode 100644
index 0000000..f2af3db
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/RangeRelocator.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multimap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+@VisibleForTesting
+public class RangeRelocator
+{
+ private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
+
+ private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
+ private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+ private final TokenMetadata tokenMetaCloneAllSettled;
+ // clone to avoid concurrent modification in calculateNaturalReplicas
+ private final TokenMetadata tokenMetaClone;
+ private final Collection<Token> tokens;
+ private final List<String> keyspaceNames;
+
+
+ RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd)
+ {
+ this.tokens = tokens;
+ this.keyspaceNames = keyspaceNames;
+ this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
+ // clone to avoid concurrent modification in calculateNaturalReplicas
+ this.tokenMetaClone = tmd.cloneOnlyTokenMap();
+ }
+
+ @VisibleForTesting
+ public RangeRelocator()
+ {
+ this.tokens = null;
+ this.keyspaceNames = null;
+ this.tokenMetaCloneAllSettled = null;
+ this.tokenMetaClone = null;
+ }
+
+ /**
+ * Wrapper that supplies accessors to the real implementations of the various dependencies for this method
+ */
+ private static Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint fetchRanges,
+ AbstractReplicationStrategy strategy,
+ String keyspace,
+ TokenMetadata tmdBefore,
+ TokenMetadata tmdAfter)
+ {
+ EndpointsByReplica preferredEndpoints =
+ RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch()::sortedByProximity,
+ strategy,
+ fetchRanges,
+ StorageService.useStrictConsistency,
+ tmdBefore,
+ tmdAfter,
+ keyspace,
+ Arrays.asList(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance),
+ new RangeStreamer.ExcludeLocalNodeFilter()));
+ return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
+ }
+
+ /**
+ * calculating endpoints to stream current ranges to if needed
+ * in some situations node will handle current ranges as part of the new ranges
+ **/
+ public static RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
+ AbstractReplicationStrategy strat,
+ TokenMetadata tmdBefore,
+ TokenMetadata tmdAfter)
+ {
+ RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
+ for (Replica toStream : streamRanges)
+ {
+ //If the range we are sending is full only send it to the new full replica
+ //There will also be a new transient replica we need to send the data to, but not
+ //the repaired data
+ EndpointsForRange oldEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
+ EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
+ logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, oldEndpoints, newEndpoints);
+
+ for (Replica newEndpoint : newEndpoints)
+ {
+ Replica oldEndpoint = oldEndpoints.byEndpoint().get(newEndpoint.endpoint());
+
+ // Nothing to do
+ if (newEndpoint.equals(oldEndpoint))
+ continue;
+
+ // Completely new range for this endpoint
+ if (oldEndpoint == null)
+ {
+ if (toStream.isTransient() && newEndpoint.isFull())
+ throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", newEndpoint, toStream));
+
+ for (Range<Token> intersection : newEndpoint.range().intersectionWith(toStream.range()))
+ {
+ endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(intersection));
+ }
+ }
+ else
+ {
+ Set<Range<Token>> subsToStream = Collections.singleton(toStream.range());
+
+ //First subtract what we already have
+ if (oldEndpoint.isFull() == newEndpoint.isFull() || oldEndpoint.isFull())
+ subsToStream = toStream.range().subtract(oldEndpoint.range());
+
+ //Now we only stream what is still replicated
+ subsToStream.stream()
+ .flatMap(range -> range.intersectionWith(newEndpoint.range()).stream())
+ .forEach(tokenRange -> endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(tokenRange)));
+ }
+ }
+ }
+ return endpointRanges.asImmutableView();
+ }
+
+ public void calculateToFromStreams()
+ {
+ logger.debug("Current tmd: {}, Updated tmd: {}", tokenMetaClone, tokenMetaCloneAllSettled);
+
+ for (String keyspace : keyspaceNames)
+ {
+ // replication strategy of the current keyspace
+ AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
+
+ logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
+ //From what I have seen we only ever call this with a single token from StorageService.move(Token)
+ for (Token newToken : tokens)
+ {
+ Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
+ if (currentTokens.size() > 1 || currentTokens.isEmpty())
+ {
+ throw new AssertionError("Unexpected current tokens: " + currentTokens);
+ }
+
+ // calculated parts of the ranges to request/stream from/to nodes in the ring
+ Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges;
+
+ //In the single node token move there is nothing to do and Range subtraction is broken
+ //so it's easier to just identify this case up front.
+ if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
+)).size() > 1)
+ {
+ // getting collection of the currently used ranges by this keyspace
+ RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
+
+ // collection of ranges which this node will serve after move to the new token
+ RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
+
+ streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
+ }
+ else
+ {
+ streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress));
+ }
+
+ RangesByEndpoint rangesToStream = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled);
+ logger.info("Endpoint ranges to stream to " + rangesToStream);
+
+ // stream ranges
+ for (InetAddressAndPort address : rangesToStream.keySet())
+ {
+ logger.debug("Will stream range {} of keyspace {} to endpoint {}", rangesToStream.get(address), keyspace, address);
+ RangesAtEndpoint ranges = rangesToStream.get(address);
+ streamPlan.transferRanges(address, keyspace, ranges);
+ }
+
+ Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> rangesToFetch = calculateRangesToFetchWithPreferredEndpoints(streamAndFetchOwnRanges.right, strategy, keyspace, tokenMetaClone, tokenMetaCloneAllSettled);
+
+ // stream requests
+ rangesToFetch.asMap().forEach((address, sourceAndOurReplicas) -> {
+ RangesAtEndpoint full = sourceAndOurReplicas.stream()
+ .filter(pair -> pair.remote.isFull())
+ .map(pair -> pair.local)
+ .collect(RangesAtEndpoint.collector(localAddress));
+ RangesAtEndpoint trans = sourceAndOurReplicas.stream()
+ .filter(pair -> pair.remote.isTransient())
+ .map(pair -> pair.local)
+ .collect(RangesAtEndpoint.collector(localAddress));
+ logger.debug("Will request range {} of keyspace {} from endpoint {}", rangesToFetch.get(address), keyspace, address);
+ streamPlan.requestRanges(address, keyspace, full, trans);
+ });
+
+ logger.debug("Keyspace {}: work map {}.", keyspace, rangesToFetch);
+ }
+ }
+ }
+
+ /**
+ * Calculate pair of ranges to stream/fetch for given two range collections
+ * (current ranges for keyspace and ranges after move to new token)
+ *
+ * With transient replication the added wrinkle is that if a range transitions from full to transient then
+ * we need to stream the range despite the fact that we are retaining it as transient. Some replica
+ * somewhere needs to transition from transient to full and we will be the source.
+ *
+ * If the range is transient and is transitioning to full then always fetch even if the range was already transient
+ * since a transiently replicated obviously needs to fetch data to become full.
+ *
+ * This why there is a continue after checking for instersection because intersection is not sufficient reason
+ * to do the subtraction since we might need to stream/fetch data anyways.
+ *
+ * @param currentRanges collection of the ranges by current token
+ * @param updatedRanges collection of the ranges after token is changed
+ * @return pair of ranges to stream/fetch for given current and updated range collections
+ */
+ public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint currentRanges, RangesAtEndpoint updatedRanges)
+ {
+ RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(currentRanges.endpoint());
+ RangesAtEndpoint.Builder toFetch = RangesAtEndpoint.builder(currentRanges.endpoint());
+ logger.debug("Calculating toStream");
+ computeRanges(currentRanges, updatedRanges, toStream);
+
+ logger.debug("Calculating toFetch");
+ computeRanges(updatedRanges, currentRanges, toFetch);
+
+ logger.debug("To stream {}", toStream);
+ logger.debug("To fetch {}", toFetch);
+ return Pair.create(toStream.build(), toFetch.build());
+ }
+
+ private static void computeRanges(RangesAtEndpoint srcRanges, RangesAtEndpoint dstRanges, RangesAtEndpoint.Builder ranges)
+ {
+ for (Replica src : srcRanges)
+ {
+ boolean intersect = false;
+ RangesAtEndpoint remainder = null;
+ for (Replica dst : dstRanges)
+ {
+ logger.debug("Comparing {} and {}", src, dst);
+ // Stream the full range if there's no intersection
+ if (!src.intersectsOnRange(dst))
+ continue;
+
+ // If we're transitioning from full to transient
+ if (src.isFull() && dst.isTransient())
+ continue;
+
+ if (remainder == null)
+ {
+ remainder = src.subtractIgnoreTransientStatus(dst.range());
+ }
+ else
+ {
+ // Re-subtract ranges to avoid overstreaming in cases when the single range is split or merged
+ RangesAtEndpoint.Builder newRemainder = new RangesAtEndpoint.Builder(remainder.endpoint());
+ for (Replica replica : remainder)
+ newRemainder.addAll(replica.subtractIgnoreTransientStatus(dst.range()));
+ remainder = newRemainder.build();
+ }
+ intersect = true;
+ }
+
+ if (!intersect)
+ {
+ assert remainder == null;
+ logger.debug(" Doesn't intersect adding {}", src);
+ ranges.add(src); // should stream whole old range
+ }
+ else
+ {
+ ranges.addAll(remainder);
+ logger.debug(" Intersects adding {}", remainder);
+ }
+ }
+ }
+
+ public Future<StreamState> stream()
+ {
+ return streamPlan.execute();
+ }
+
+ public boolean streamsNeeded()
+ {
+ return !streamPlan.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a979f1c..391598c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -236,7 +236,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private Collection<Token> bootstrapTokens = null;
// true when keeping strict consistency while bootstrapping
- private static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
+ public static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
private static final boolean allowSimultaneousMoves = Boolean.parseBoolean(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false"));
private static final boolean joinRing = Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"));
private boolean replacing;
@@ -1227,7 +1227,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
streamStateStore,
false,
DatabaseDescriptor.getStreamingConnectionsPerHost());
- streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
if (sourceDc != null)
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
@@ -4316,208 +4315,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
}
- @VisibleForTesting
- public static class RangeRelocator
- {
- private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
- private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
- private final TokenMetadata tokenMetaCloneAllSettled;
- // clone to avoid concurrent modification in calculateNaturalReplicas
- private final TokenMetadata tokenMetaClone;
- private final Collection<Token> tokens;
- private final List<String> keyspaceNames;
-
-
- private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd)
- {
- this.tokens = tokens;
- this.keyspaceNames = keyspaceNames;
- this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
- // clone to avoid concurrent modification in calculateNaturalReplicas
- this.tokenMetaClone = tmd.cloneOnlyTokenMap();
- }
-
- @VisibleForTesting
- public RangeRelocator()
- {
- this.tokens = null;
- this.keyspaceNames = null;
- this.tokenMetaCloneAllSettled = null;
- this.tokenMetaClone = null;
- }
-
- /**
- * Wrapper that supplies accessors to the real implementations of the various dependencies for this method
- */
- private Multimap<InetAddressAndPort, FetchReplica> calculateRangesToFetchWithPreferredEndpoints(AbstractReplicationStrategy strategy, RangesAtEndpoint fetchRanges, String keyspace)
- {
- EndpointsByReplica preferredEndpoints =
- RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> DatabaseDescriptor.getEndpointSnitch().sortedByProximity(address, replicas),
- strategy,
- fetchRanges,
- useStrictConsistency,
- tokenMetaClone,
- tokenMetaCloneAllSettled,
- RangeStreamer.ALIVE_PREDICATE,
- keyspace,
- Collections.emptyList());
- return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
- }
-
- /**
- * calculating endpoints to stream current ranges to if needed
- * in some situations node will handle current ranges as part of the new ranges
- **/
- public RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
- AbstractReplicationStrategy strat,
- TokenMetadata tmdBefore,
- TokenMetadata tmdAfter)
- {
- RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
- for (Replica toStream : streamRanges)
- {
- //If the range we are sending is full only send it to the new full replica
- //There will also be a new transient replica we need to send the data to, but not
- //the repaired data
- EndpointsForRange currentEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
- EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
- logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, currentEndpoints, newEndpoints);
-
- for (Replica current : currentEndpoints)
- {
- for (Replica updated : newEndpoints)
- {
- if (current.endpoint().equals(updated.endpoint()))
- {
- //Nothing to do
- if (current.equals(updated))
- break;
-
- //In these two (really three) cases the existing data is sufficient and we should subtract whatever is already replicated
- if (current.isFull() == updated.isFull() || current.isFull())
- {
- //First subtract what we already have
- Set<Range<Token>> subsToStream = toStream.range().subtract(current.range());
- //Now we only stream what is still replicated
- subsToStream = subsToStream.stream().flatMap(range -> range.intersectionWith(updated.range()).stream()).collect(Collectors.toSet());
- for (Range<Token> subrange : subsToStream)
- {
- //Only stream what intersects with what is in the new world
- Set<Range<Token>> intersections = subrange.intersectionWith(updated.range());
- for (Range<Token> intersection : intersections)
- {
- endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
- }
- }
- }
- else
- {
- for (Range<Token> intersection : toStream.range().intersectionWith(updated.range()))
- {
- endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
- }
- }
- }
- }
- }
-
- for (Replica updated : newEndpoints)
- {
- if (!currentEndpoints.byEndpoint().containsKey(updated.endpoint()))
- {
- // Completely new range for this endpoint
- if (toStream.isTransient() && updated.isFull())
- {
- throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", updated, toStream));
- }
- for (Range<Token> intersection : updated.range().intersectionWith(toStream.range()))
- {
- endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
- }
- }
- }
- }
- return endpointRanges.asImmutableView();
- }
-
- private void calculateToFromStreams()
- {
- logger.debug("Current tmd " + tokenMetaClone);
- logger.debug("Updated tmd " + tokenMetaCloneAllSettled);
- for (String keyspace : keyspaceNames)
- {
- // replication strategy of the current keyspace
- AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
- // getting collection of the currently used ranges by this keyspace
- RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
-
- logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
- //From what I have seen we only ever call this with a single token from StorageService.move(Token)
- for (Token newToken : tokens)
- {
- Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
- if (currentTokens.size() > 1 || currentTokens.isEmpty())
- {
- throw new AssertionError("Unexpected current tokens: " + currentTokens);
- }
-
- // collection of ranges which this node will serve after move to the new token
- RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
-
- // calculated parts of the ranges to request/stream from/to nodes in the ring
- Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress));
- //In the single node token move there is nothing to do and Range subtraction is broken
- //so it's easier to just identify this case up front.
- if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
-)).size() > 1)
- {
- streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
- }
-
- Multimap<InetAddressAndPort, FetchReplica> workMap = calculateRangesToFetchWithPreferredEndpoints(strategy, streamAndFetchOwnRanges.right, keyspace);
-
- RangesByEndpoint endpointRanges = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled);
-
- logger.info("Endpoint ranges to stream to " + endpointRanges);
-
- // stream ranges
- for (InetAddressAndPort address : endpointRanges.keySet())
- {
- logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address);
- RangesAtEndpoint ranges = endpointRanges.get(address);
- streamPlan.transferRanges(address, keyspace, ranges);
- }
-
- // stream requests
- workMap.asMap().forEach((address, sourceAndOurReplicas) -> {
- RangesAtEndpoint full = sourceAndOurReplicas.stream()
- .filter(pair -> pair.remote.isFull())
- .map(pair -> pair.local)
- .collect(RangesAtEndpoint.collector(localAddress));
- RangesAtEndpoint transientReplicas = sourceAndOurReplicas.stream()
- .filter(pair -> pair.remote.isTransient())
- .map(pair -> pair.local)
- .collect(RangesAtEndpoint.collector(localAddress));
- logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address);
- streamPlan.requestRanges(address, keyspace, full, transientReplicas);
- });
-
- logger.debug("Keyspace {}: work map {}.", keyspace, workMap);
- }
- }
- }
-
- public Future<StreamState> stream()
- {
- return streamPlan.execute();
- }
-
- public boolean streamsNeeded()
- {
- return !streamPlan.isEmpty();
- }
- }
-
public String getRemovalStatus()
{
return getRemovalStatus(false);
@@ -5271,115 +5068,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return streamPlan.execute();
}
- /**
- * Calculate pair of ranges to stream/fetch for given two range collections
- * (current ranges for keyspace and ranges after move to new token)
- *
- * With transient replication the added wrinkle is that if a range transitions from full to transient then
- * we need to stream the range despite the fact that we are retaining it as transient. Some replica
- * somewhere needs to transition from transient to full and we wll be the source.
- *
- * If the range is transient and is transitioning to full then always fetch even if the range was already transient
- * since a transiently replicated obviously needs to fetch data to become full.
- *
- * This why there is a continue after checking for instersection because intersection is not sufficient reason
- * to do the subtraction since we might need to stream/fetch data anyways.
- *
- * @param current collection of the ranges by current token
- * @param updated collection of the ranges after token is changed
- * @return pair of ranges to stream/fetch for given current and updated range collections
- */
- public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint current, RangesAtEndpoint updated)
- {
- // FIXME: transient replication
- // this should always be the local node, except for tests TODO: assert this
- RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(current.endpoint());
- RangesAtEndpoint.Builder toFetch = RangesAtEndpoint.builder(current.endpoint());
-
- logger.debug("Calculating toStream");
- for (Replica r1 : current)
- {
- boolean intersect = false;
- RangesAtEndpoint.Mutable remainder = null;
- for (Replica r2 : updated)
- {
- logger.debug("Comparing {} and {}", r1, r2);
- //If we will end up transiently replicating send the entire thing and don't subtract
- if (r1.intersectsOnRange(r2) && !(r1.isFull() && r2.isTransient()))
- {
- RangesAtEndpoint.Mutable oldRemainder = remainder;
- remainder = new RangesAtEndpoint.Mutable(current.endpoint());
- if (oldRemainder != null)
- {
- for (Replica replica : oldRemainder)
- {
- remainder.addAll(replica.subtractIgnoreTransientStatus(r2.range()));
- }
- }
- else
- {
- remainder.addAll(r1.subtractIgnoreTransientStatus(r2.range()));
- }
- logger.debug(" Intersects adding {}", remainder);
- intersect = true;
- }
- }
- if (!intersect)
- {
- logger.debug(" Doesn't intersect adding {}", r1);
- toStream.add(r1); // should stream whole old range
- }
- else
- {
- toStream.addAll(remainder);
- }
- }
-
- logger.debug("Calculating toFetch");
- for (Replica r2 : updated)
- {
- boolean intersect = false;
- RangesAtEndpoint.Mutable remainder = null;
- for (Replica r1 : current)
- {
- logger.info("Comparing {} and {}", r2, r1);
- //Transitioning from transient to full means fetch everything so intersection doesn't matter.
- if (r2.intersectsOnRange(r1) && !(r1.isTransient() && r2.isFull()))
- {
- RangesAtEndpoint.Mutable oldRemainder = remainder;
- remainder = new RangesAtEndpoint.Mutable(current.endpoint());
- if (oldRemainder != null)
- {
- for (Replica replica : oldRemainder)
- {
- remainder.addAll(replica.subtractIgnoreTransientStatus(r1.range()));
- }
- }
- else
- {
- remainder.addAll(r2.subtractIgnoreTransientStatus(r1.range()));
- }
- logger.debug(" Intersects adding {}", remainder);
- intersect = true;
- }
- }
- if (!intersect)
- {
- logger.debug(" Doesn't intersect adding {}", r2);
- toFetch.add(r2); // should fetch whole old range
- }
- else
- {
- toFetch.addAll(remainder);
- }
- }
-
- logger.debug("To stream {}", toStream);
- logger.debug("To fetch {}", toFetch);
-
- return Pair.create(toStream.build(), toFetch.build());
- }
-
public void bulkLoad(String directory)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 2f6deb5..ea54f9d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -70,6 +70,16 @@ public class StreamPlan
/**
* Request data in {@code keyspace} and {@code ranges} from specific node.
*
+ * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
+ * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
+ *
+ * At the other end the distinction between full and transient is ignored it just used the transient status
+ * of the Replica objects we send to determine what to send. The real reason we have this split down to
+ * StreamRequest is that on completion StreamRequest is used to write to the system table tracking
+ * what has already been streamed. At that point since we only have the local Replica instances so we don't
+ * know what we got from the remote. We preserve that here by splitting based on the remotes transient
+ * status.
+ *
* @param from endpoint address to fetch data from.
* @param keyspace name of keyspace
* @param fullRanges ranges to fetch that from provides the full version of
@@ -94,10 +104,9 @@ public class StreamPlan
public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies)
{
//It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
- assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
- fullRanges.toString();
- assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
- transientRanges.toString();
+ assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+ assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+
StreamSession session = coordinator.getOrCreateNextSession(from);
session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies));
return this;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index ec80772..d7d0836 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -300,6 +300,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
/**
* Request data fetch task to this session.
*
+ * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
+ * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
+ *
* @param keyspace Requesting keyspace
* @param fullRanges Ranges to retrieve data that will return full data from the source
* @param transientRanges Ranges to retrieve data that will return transient data from the source
@@ -308,8 +311,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies)
{
//It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
- assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : fullRanges.toString();
- assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : transientRanges.toString();
+ assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+ assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+
requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org