You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:07 UTC
[15/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 110fed6..e8aa5d3 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -18,27 +18,40 @@
package org.apache.cassandra.dht;
import java.util.*;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsByReplica;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.EndpointsByRange;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PreviewKind;
@@ -47,13 +60,25 @@ import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.utils.FBUtilities;
+import static com.google.common.base.Predicates.and;
+import static com.google.common.base.Predicates.not;
+import static com.google.common.collect.Iterables.all;
+import static com.google.common.collect.Iterables.any;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+
/**
- * Assists in streaming ranges to a node.
+ * Assists in streaming ranges to this node.
*/
public class RangeStreamer
{
private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
+ public static Predicate<Replica> ALIVE_PREDICATE = replica ->
+ (!Gossiper.instance.isEnabled() ||
+ (Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()) == null ||
+ Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()).isAlive())) &&
+ FailureDetector.instance.isAlive(replica.endpoint());
+
/* bootstrap tokens. can be null if replacing the node. */
private final Collection<Token> tokens;
/* current token ring */
@@ -62,26 +87,59 @@ public class RangeStreamer
private final InetAddressAndPort address;
/* streaming description */
private final String description;
- private final Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = HashMultimap.create();
- private final Set<ISourceFilter> sourceFilters = new HashSet<>();
+ private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = HashMultimap.create();
+ private final Set<Predicate<Replica>> sourceFilters = new HashSet<>();
private final StreamPlan streamPlan;
private final boolean useStrictConsistency;
private final IEndpointSnitch snitch;
private final StreamStateStore stateStore;
- /**
- * A filter applied to sources to stream from when constructing a fetch map.
- */
- public static interface ISourceFilter
+ public static class FetchReplica
{
- public boolean shouldInclude(InetAddressAndPort endpoint);
+ public final Replica local;
+ public final Replica remote;
+
+ public FetchReplica(Replica local, Replica remote)
+ {
+ Preconditions.checkNotNull(local);
+ Preconditions.checkNotNull(remote);
+ assert local.isLocal() && !remote.isLocal();
+ this.local = local;
+ this.remote = remote;
+ }
+
+ public String toString()
+ {
+ return "FetchReplica{" +
+ "local=" + local +
+ ", remote=" + remote +
+ '}';
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ FetchReplica that = (FetchReplica) o;
+
+ if (!local.equals(that.local)) return false;
+ return remote.equals(that.remote);
+ }
+
+ public int hashCode()
+ {
+ int result = local.hashCode();
+ result = 31 * result + remote.hashCode();
+ return result;
+ }
}
/**
* Source filter which excludes any endpoints that are not alive according to a
* failure detector.
*/
- public static class FailureDetectorSourceFilter implements ISourceFilter
+ public static class FailureDetectorSourceFilter implements Predicate<Replica>
{
private final IFailureDetector fd;
@@ -90,16 +148,16 @@ public class RangeStreamer
this.fd = fd;
}
- public boolean shouldInclude(InetAddressAndPort endpoint)
+ public boolean apply(Replica replica)
{
- return fd.isAlive(endpoint);
+ return fd.isAlive(replica.endpoint());
}
}
/**
* Source filter which excludes any endpoints that are not in a specific data center.
*/
- public static class SingleDatacenterFilter implements ISourceFilter
+ public static class SingleDatacenterFilter implements Predicate<Replica>
{
private final String sourceDc;
private final IEndpointSnitch snitch;
@@ -110,27 +168,27 @@ public class RangeStreamer
this.snitch = snitch;
}
- public boolean shouldInclude(InetAddressAndPort endpoint)
+ public boolean apply(Replica replica)
{
- return snitch.getDatacenter(endpoint).equals(sourceDc);
+ return snitch.getDatacenter(replica).equals(sourceDc);
}
}
/**
* Source filter which excludes the current node from source calculations
*/
- public static class ExcludeLocalNodeFilter implements ISourceFilter
+ public static class ExcludeLocalNodeFilter implements Predicate<Replica>
{
- public boolean shouldInclude(InetAddressAndPort endpoint)
+ public boolean apply(Replica replica)
{
- return !FBUtilities.getBroadcastAddressAndPort().equals(endpoint);
+ return !replica.isLocal();
}
}
/**
* Source filter which only includes endpoints contained within a provided set.
*/
- public static class WhitelistedSourcesFilter implements ISourceFilter
+ public static class WhitelistedSourcesFilter implements Predicate<Replica>
{
private final Set<InetAddressAndPort> whitelistedSources;
@@ -139,9 +197,9 @@ public class RangeStreamer
this.whitelistedSources = whitelistedSources;
}
- public boolean shouldInclude(InetAddressAndPort endpoint)
+ public boolean apply(Replica replica)
{
- return whitelistedSources.contains(endpoint);
+ return whitelistedSources.contains(replica.endpoint());
}
}
@@ -167,7 +225,7 @@ public class RangeStreamer
streamPlan.listeners(this.stateStore);
}
- public void addSourceFilter(ISourceFilter filter)
+ public void addSourceFilter(Predicate<Replica> filter)
{
sourceFilters.add(filter);
}
@@ -176,80 +234,95 @@ public class RangeStreamer
* Add ranges to be streamed for given keyspace.
*
* @param keyspaceName keyspace name
- * @param ranges ranges to be streamed
+ * @param replicas ranges to be streamed
*/
- public void addRanges(String keyspaceName, Collection<Range<Token>> ranges)
+ public void addRanges(String keyspaceName, ReplicaCollection<?> replicas)
{
- if(Keyspace.open(keyspaceName).getReplicationStrategy() instanceof LocalStrategy)
+ Keyspace keyspace = Keyspace.open(keyspaceName);
+ AbstractReplicationStrategy strat = keyspace.getReplicationStrategy();
+ if(strat instanceof LocalStrategy)
{
logger.info("Not adding ranges for Local Strategy keyspace={}", keyspaceName);
return;
}
- boolean useStrictSource = useStrictSourcesForRanges(keyspaceName);
- Multimap<Range<Token>, InetAddressAndPort> rangesForKeyspace = useStrictSource
- ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName, ranges);
+ boolean useStrictSource = useStrictSourcesForRanges(strat);
+ EndpointsByReplica fetchMap = calculateRangesToFetchWithPreferredEndpoints(replicas, keyspace, useStrictSource);
- for (Map.Entry<Range<Token>, InetAddressAndPort> entry : rangesForKeyspace.entries())
+ for (Map.Entry<Replica, Replica> entry : fetchMap.flattenEntries())
logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName);
- AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy();
- Multimap<InetAddressAndPort, Range<Token>> rangeFetchMap = useStrictSource || strat == null || strat.getReplicationFactor() == 1
- ? getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, useStrictConsistency)
- : getOptimizedRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName);
- for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : rangeFetchMap.asMap().entrySet())
+ Multimap<InetAddressAndPort, FetchReplica> workMap;
+ //Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no
+ //transient replicas.
+ if (useStrictSource || strat == null || strat.getReplicationFactor().allReplicas == 1 || strat.getReplicationFactor().hasTransientReplicas())
+ {
+ workMap = convertPreferredEndpointsToWorkMap(fetchMap);
+ }
+ else
+ {
+ workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName);
+ }
+
+ toFetch.put(keyspaceName, workMap);
+ for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
{
if (logger.isTraceEnabled())
{
- for (Range<Token> r : entry.getValue())
- logger.trace("{}: range {} from source {} for keyspace {}", description, r, entry.getKey(), keyspaceName);
+ for (FetchReplica r : entry.getValue())
+ logger.trace("{}: range source {} local range {} for keyspace {}", description, r.remote, r.local, keyspaceName);
}
- toFetch.put(keyspaceName, entry);
}
}
/**
- * @param keyspaceName keyspace name to check
+ * @param strat AbstractReplicationStrategy of keyspace to check
* @return true when the node is bootstrapping, useStrictConsistency is true and # of nodes in the cluster is more than # of replica
*/
- private boolean useStrictSourcesForRanges(String keyspaceName)
+ private boolean useStrictSourcesForRanges(AbstractReplicationStrategy strat)
{
- AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy();
return useStrictConsistency
&& tokens != null
- && metadata.getSizeOfAllEndpoints() != strat.getReplicationFactor();
+ && metadata.getSizeOfAllEndpoints() != strat.getReplicationFactor().allReplicas;
}
/**
- * Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
- * to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
- *
- * @throws java.lang.IllegalStateException when there is no source to get data streamed
+ * Wrapper method to assemble the arguments for invoking the implementation with RangeStreamer's parameters
+ * @param fetchRanges
+ * @param keyspace
+ * @param useStrictConsistency
+ * @return
*/
- private Multimap<Range<Token>, InetAddressAndPort> getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> desiredRanges)
+ private EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, Keyspace keyspace, boolean useStrictConsistency)
{
- AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy();
- Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = strat.getRangeAddresses(metadata.cloneOnlyTokenMap());
+ AbstractReplicationStrategy strat = keyspace.getReplicationStrategy();
- Multimap<Range<Token>, InetAddressAndPort> rangeSources = ArrayListMultimap.create();
- for (Range<Token> desiredRange : desiredRanges)
- {
- for (Range<Token> range : rangeAddresses.keySet())
- {
- if (range.contains(desiredRange))
- {
- List<InetAddressAndPort> preferred = snitch.getSortedListByProximity(address, rangeAddresses.get(range));
- rangeSources.putAll(desiredRange, preferred);
- break;
- }
- }
+ TokenMetadata tmd = metadata.cloneOnlyTokenMap();
+
+ TokenMetadata tmdAfter = null;
- if (!rangeSources.keySet().contains(desiredRange))
- throw new IllegalStateException("No sources found for " + desiredRange);
+ if (tokens != null)
+ {
+ // Pending ranges
+ tmdAfter = tmd.cloneOnlyTokenMap();
+ tmdAfter.updateNormalTokens(tokens, address);
}
+ else if (useStrictConsistency)
+ {
+ throw new IllegalArgumentException("Can't ask for strict consistency and not supply tokens");
+ }
+
+ return RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
+ strat,
+ fetchRanges,
+ useStrictConsistency,
+ tmd,
+ tmdAfter,
+ ALIVE_PREDICATE,
+ keyspace.getName(),
+ sourceFilters);
- return rangeSources;
}
/**
@@ -257,129 +330,234 @@ public class RangeStreamer
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
*
- * @throws java.lang.IllegalStateException when there is no source to get data streamed, or more than 1 source found.
- */
- private Multimap<Range<Token>, InetAddressAndPort> getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> desiredRanges)
+ **/
+ public static EndpointsByReplica
+ calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity,
+ AbstractReplicationStrategy strat,
+ ReplicaCollection<?> fetchRanges,
+ boolean useStrictConsistency,
+ TokenMetadata tmdBefore,
+ TokenMetadata tmdAfter,
+ Predicate<Replica> isAlive,
+ String keyspace,
+ Collection<Predicate<Replica>> sourceFilters)
{
- assert tokens != null;
- AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy();
-
- // Active ranges
- TokenMetadata metadataClone = metadata.cloneOnlyTokenMap();
- Multimap<Range<Token>, InetAddressAndPort> addressRanges = strat.getRangeAddresses(metadataClone);
+ EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
- // Pending ranges
- metadataClone.updateNormalTokens(tokens, address);
- Multimap<Range<Token>, InetAddressAndPort> pendingRangeAddresses = strat.getRangeAddresses(metadataClone);
+ InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+ logger.debug ("Keyspace: {}", keyspace);
+ logger.debug("To fetch RN: {}", fetchRanges);
+ logger.debug("Fetch ranges: {}", rangeAddresses);
- // Collects the source that will have its range moved to the new node
- Multimap<Range<Token>, InetAddressAndPort> rangeSources = ArrayListMultimap.create();
+ Predicate<Replica> testSourceFilters = and(sourceFilters);
+ Function<EndpointsForRange, EndpointsForRange> sorted =
+ endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
- for (Range<Token> desiredRange : desiredRanges)
+ //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
+ EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
+ for (Replica toFetch : fetchRanges)
{
- for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> preEntry : addressRanges.asMap().entrySet())
+ //Replica that is sufficient to provide the data we need
+ //With strict consistency and transient replication we may end up with multiple types
+ //so this isn't used with strict consistency
+ Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
+ Predicate<Replica> accept = r ->
+ isSufficient.test(r) // is sufficient
+ && !r.endpoint().equals(localAddress) // is not self
+ && isAlive.test(r); // is alive
+
+ logger.debug("To fetch {}", toFetch);
+ for (Range<Token> range : rangeAddresses.keySet())
{
- if (preEntry.getKey().contains(desiredRange))
+ if (range.contains(toFetch.range()))
{
- Set<InetAddressAndPort> oldEndpoints = Sets.newHashSet(preEntry.getValue());
- Set<InetAddressAndPort> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange));
+ EndpointsForRange oldEndpoints = rangeAddresses.get(range);
- // Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
- // So we need to be careful to only be strict when endpoints == RF
- if (oldEndpoints.size() == strat.getReplicationFactor())
+ //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
+ //It could be multiple endpoints and we must fetch from all of them if they are there
+ //With transient replication and strict consistency this is to get the full data from a full replica and
+ //transient data from the transient replica losing data
+ EndpointsForRange sources;
+ if (useStrictConsistency)
+ {
+ //Start with two sets of who replicates the range before and who replicates it after
+ EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
+ logger.debug("Old endpoints {}", oldEndpoints);
+ logger.debug("New endpoints {}", newEndpoints);
+
+ //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+ //So we need to be careful to only be strict when endpoints == RF
+ if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
+ {
+ Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
+ // Remove new endpoints from old endpoints based on address
+ oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
+
+ if (!all(oldEndpoints, isAlive))
+ throw new IllegalStateException("A node required to move the data consistently is down: "
+ + oldEndpoints.filter(not(isAlive)));
+
+ if (oldEndpoints.size() > 1)
+ throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
+
+ //If we are transitioning from transient to full and and the set of replicas for the range is not changing
+ //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
+ //since we are already a transient replica and the existing replica remains.
+ //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
+ //So it's an error if we don't find what we need.
+ if (oldEndpoints.isEmpty() && toFetch.isTransient())
+ {
+ throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
+ }
+
+ if (!any(oldEndpoints, isSufficient))
+ {
+ // need an additional replica
+ EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
+ // include all our filters, to ensure we include a matching node
+ Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
+ if (fullReplica.isPresent())
+ oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
+ else
+ throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
+ }
+
+ //We have to check the source filters here to see if they will remove any replicas
+ //required for strict consistency
+ if (!all(oldEndpoints, testSourceFilters))
+ throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
+ }
+ else
+ {
+ oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
+ }
+
+ //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
+ sources = oldEndpoints.filter(testSourceFilters);
+ }
+ else
{
- oldEndpoints.removeAll(newEndpoints);
- assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size();
+ //Without strict consistency we have given up on correctness so no point in fetching from
+ //a random full + transient replica since it's also likely to lose data
+ //Also apply testSourceFilters that were given to us so we can safely select a single source
+ sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
+ //Limit it to just the first possible source, we don't need more than one and downstream
+ //will fetch from every source we supply
+ sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
}
- rangeSources.put(desiredRange, oldEndpoints.iterator().next());
+ // storing range and preferred endpoint set
+ rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
+ logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
}
}
- // Validate
- Collection<InetAddressAndPort> addressList = rangeSources.get(desiredRange);
- if (addressList == null || addressList.isEmpty())
- throw new IllegalStateException("No sources found for " + desiredRange);
-
- if (addressList.size() > 1)
- throw new IllegalStateException("Multiple endpoints found for " + desiredRange);
+ EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
+ if (addressList == null)
+ throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
+
+ /*
+ * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
+ * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
+ * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
+ * For a transient range we only need to fetch from one.
+ */
+ if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
+ throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
+
+ //We must have enough stuff to fetch from
+ if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
+ {
+ if (strat.getReplicationFactor().allReplicas == 1)
+ {
+ if (useStrictConsistency)
+ {
+ logger.warn("A node required to move the data consistently is down");
+ throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
+ "Ensure this keyspace contains replicas in the source datacenter.");
+ }
+ else
+ logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
+ "Keyspace might be missing data.", toFetch, keyspace);
- InetAddressAndPort sourceIp = addressList.iterator().next();
- EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp);
- if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive()))
- throw new RuntimeException("A node required to move the data consistently is down (" + sourceIp + "). " +
- "If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false");
+ }
+ else
+ {
+ if (useStrictConsistency)
+ logger.warn("A node required to move the data consistently is down");
+ throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
+ }
+ }
}
-
- return rangeSources;
+ return rangesToFetchWithPreferredEndpoints.asImmutableView();
}
/**
- * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
- * @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given
- * here, we always exclude ourselves.
- * @param keyspace keyspace name
- * @return Map of source endpoint to collection of ranges
+ * The preferred endpoint list is the wrong format because it is keyed by Replica (this node) rather than the source
+ * endpoint we will fetch from which streaming wants.
+ * @param preferredEndpoints
+ * @return
*/
- private static Multimap<InetAddressAndPort, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources,
- Collection<ISourceFilter> sourceFilters, String keyspace,
- boolean useStrictConsistency)
+ public static Multimap<InetAddressAndPort, FetchReplica> convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints)
{
- Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = HashMultimap.create();
- for (Range<Token> range : rangesWithSources.keySet())
+ Multimap<InetAddressAndPort, FetchReplica> workMap = HashMultimap.create();
+ for (Map.Entry<Replica, EndpointsForRange> e : preferredEndpoints.entrySet())
{
- boolean foundSource = false;
-
- outer:
- for (InetAddressAndPort address : rangesWithSources.get(range))
+ for (Replica source : e.getValue())
{
- for (ISourceFilter filter : sourceFilters)
- {
- if (!filter.shouldInclude(address))
- continue outer;
- }
+ assert (e.getKey()).isLocal();
+ assert !source.isLocal();
+ workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source));
+ }
+ }
+ logger.debug("Work map {}", workMap);
+ return workMap;
+ }
- if (address.equals(FBUtilities.getBroadcastAddressAndPort()))
- {
- // If localhost is a source, we have found one, but we don't add it to the map to avoid streaming locally
- foundSource = true;
- continue;
- }
+ /**
+ * Optimized version that also outputs the final work map
+ */
+ private static Multimap<InetAddressAndPort, FetchReplica> getOptimizedWorkMap(EndpointsByReplica rangesWithSources,
+ Collection<Predicate<Replica>> sourceFilters, String keyspace)
+ {
+ //For now we just aren't going to use the optimized range fetch map with transient replication to shrink
+ //the surface area to test and introduce bugs.
+ //In the future it's possible we could run it twice once for full ranges with only full replicas
+ //and once with transient ranges and all replicas. Then merge the result.
+ EndpointsByRange.Mutable unwrapped = new EndpointsByRange.Mutable();
+ for (Map.Entry<Replica, Replica> entry : rangesWithSources.flattenEntries())
+ {
+ Replicas.temporaryAssertFull(entry.getValue());
+ unwrapped.put(entry.getKey().range(), entry.getValue());
+ }
- rangeFetchMapMap.put(address, range);
- foundSource = true;
- break; // ensure we only stream from one other node for each range
- }
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrapped.asImmutableView(), sourceFilters, keyspace);
+ Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap();
+ logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace);
+ validateRangeFetchMap(unwrapped.asImmutableView(), rangeFetchMapMap, keyspace);
- if (!foundSource)
+ //Need to rewrap as Replicas
+ Multimap<InetAddressAndPort, FetchReplica> wrapped = HashMultimap.create();
+ for (Map.Entry<InetAddressAndPort, Range<Token>> entry : rangeFetchMapMap.entries())
+ {
+ Replica toFetch = null;
+ for (Replica r : rangesWithSources.keySet())
{
- AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy();
- if (strat != null && strat.getReplicationFactor() == 1)
+ if (r.range().equals(entry.getValue()))
{
- if (useStrictConsistency)
- throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace + " with RF=1. " +
- "Ensure this keyspace contains replicas in the source datacenter.");
- else
- logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
- "Keyspace might be missing data.", range, keyspace);
+ if (toFetch != null)
+ throw new AssertionError(String.format("There shouldn't be multiple replicas for range %s, replica %s and %s here", r.range(), r, toFetch));
+ toFetch = r;
}
- else
- throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace);
}
+ if (toFetch == null)
+ throw new AssertionError("Shouldn't be possible for the Replica we fetch to be null here");
+ //Committing the cardinal sin of synthesizing a Replica, but it's ok because we assert earlier all of them
+ //are full and optimized range fetch map doesn't support transient replication yet.
+ wrapped.put(entry.getKey(), new FetchReplica(toFetch, fullReplica(entry.getKey(), entry.getValue())));
}
- return rangeFetchMapMap;
- }
-
-
- private static Multimap<InetAddressAndPort, Range<Token>> getOptimizedRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources,
- Collection<ISourceFilter> sourceFilters, String keyspace)
- {
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, sourceFilters, keyspace);
- Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap();
- logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace);
- validateRangeFetchMap(rangesWithSources, rangeFetchMapMap, keyspace);
- return rangeFetchMapMap;
+ return wrapped;
}
/**
@@ -388,7 +566,7 @@ public class RangeStreamer
* @param rangeFetchMapMap
* @param keyspace
*/
- private static void validateRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace)
+ private static void validateRangeFetchMap(EndpointsByRange rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace)
{
for (Map.Entry<InetAddressAndPort, Range<Token>> entry : rangeFetchMapMap.entries())
{
@@ -398,7 +576,7 @@ public class RangeStreamer
+ " in keyspace " + keyspace);
}
- if (!rangesWithSources.get(entry.getValue()).contains(entry.getKey()))
+ if (!rangesWithSources.get(entry.getValue()).endpoints().contains(entry.getKey()))
{
throw new IllegalStateException("Trying to stream from wrong endpoint. Range: " + entry.getValue()
+ " in keyspace " + keyspace + " from endpoint: " + entry.getKey());
@@ -408,39 +586,70 @@ public class RangeStreamer
}
}
- public static Multimap<InetAddressAndPort, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSourceTarget, String keyspace,
- IFailureDetector fd, boolean useStrictConsistency)
- {
- return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), keyspace, useStrictConsistency);
- }
-
// For testing purposes
@VisibleForTesting
- Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch()
+ Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
{
return toFetch;
}
public StreamResultFuture fetchAsync()
{
- for (Map.Entry<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> entry : toFetch.entries())
- {
- String keyspace = entry.getKey();
- InetAddressAndPort source = entry.getValue().getKey();
- Collection<Range<Token>> ranges = entry.getValue().getValue();
+ toFetch.forEach((keyspace, sources) -> {
+ logger.debug("Keyspace {} Sources {}", keyspace, sources);
+ sources.asMap().forEach((source, fetchReplicas) -> {
- // filter out already streamed ranges
- Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
- if (ranges.removeAll(availableRanges))
- {
- logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges);
- }
+ // filter out already streamed ranges
+ RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
- if (logger.isTraceEnabled())
- logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(ranges, ", "));
- /* Send messages to respective folks to stream data over to me */
- streamPlan.requestRanges(source, keyspace, ranges);
- }
+ Predicate<FetchReplica> isAvailable = fetch -> {
+ Replica availableRange = available.byRange().get(fetch.local.range());
+ if (availableRange == null)
+ //Range is unavailable
+ return false;
+ if (fetch.local.isFull())
+ //For full, pick only replicas with matching transientness
+ return availableRange.isFull() == fetch.remote.isFull();
+
+ // Any transient or full will do
+ return true;
+ };
+
+ List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList());
+
+ if (remaining.size() < available.size())
+ {
+ List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
+ logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}",
+ fetchReplicas, skipped, available.filter(Replica::isFull).ranges(), available.filter(Replica::isTransient).ranges());
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", "));
+
+ //At the other end the distinction between full and transient is ignored it just used the transient status
+ //of the Replica objects we send to determine what to send. The real reason we have this split down to
+ //StreamRequest is that on completion StreamRequest is used to write to the system table tracking
+ //what has already been streamed. At that point since we only have the local Replica instances so we don't
+ //know what we got from the remote. We preserve that here by splitting based on the remotes transient
+ //status.
+ InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+ RangesAtEndpoint full = remaining.stream()
+ .filter(pair -> pair.remote.isFull())
+ .map(pair -> pair.local)
+ .collect(RangesAtEndpoint.collector(self));
+ RangesAtEndpoint transientReplicas = remaining.stream()
+ .filter(pair -> pair.remote.isTransient())
+ .map(pair -> pair.local)
+ .collect(RangesAtEndpoint.collector(self));
+
+ logger.debug("Source and our replicas {}", fetchReplicas);
+ logger.debug("Source {} Keyspace {} streaming full {} transient {}", source, keyspace, full, transientReplicas);
+
+ /* Send messages to respective folks to stream data over to me */
+ streamPlan.requestRanges(source, keyspace, full, transientReplicas);
+ });
+ });
return streamPlan.execute();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/Splitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java
index c63fe91..8578448 100644
--- a/src/java/org/apache/cassandra/dht/Splitter.java
+++ b/src/java/org/apache/cassandra/dht/Splitter.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
@@ -117,32 +118,31 @@ public abstract class Splitter
return new BigDecimal(elapsedTokens(token, range)).divide(new BigDecimal(tokensInRange(range)), 3, BigDecimal.ROUND_HALF_EVEN).doubleValue();
}
- public List<Token> splitOwnedRanges(int parts, List<Range<Token>> localRanges, boolean dontSplitRanges)
+ public List<Token> splitOwnedRanges(int parts, List<WeightedRange> weightedRanges, boolean dontSplitRanges)
{
- if (localRanges.isEmpty() || parts == 1)
+ if (weightedRanges.isEmpty() || parts == 1)
return Collections.singletonList(partitioner.getMaximumToken());
BigInteger totalTokens = BigInteger.ZERO;
- for (Range<Token> r : localRanges)
+ for (WeightedRange weightedRange : weightedRanges)
{
- BigInteger right = valueForToken(token(r.right));
- totalTokens = totalTokens.add(right.subtract(valueForToken(r.left)));
+ totalTokens = totalTokens.add(weightedRange.totalTokens(this));
}
+
BigInteger perPart = totalTokens.divide(BigInteger.valueOf(parts));
// the range owned is so tiny we can't split it:
if (perPart.equals(BigInteger.ZERO))
return Collections.singletonList(partitioner.getMaximumToken());
if (dontSplitRanges)
- return splitOwnedRangesNoPartialRanges(localRanges, perPart, parts);
+ return splitOwnedRangesNoPartialRanges(weightedRanges, perPart, parts);
List<Token> boundaries = new ArrayList<>();
BigInteger sum = BigInteger.ZERO;
- for (Range<Token> r : localRanges)
+ for (WeightedRange weightedRange : weightedRanges)
{
- Token right = token(r.right);
- BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left)).abs();
- BigInteger left = valueForToken(r.left);
+ BigInteger currentRangeWidth = weightedRange.totalTokens(this);
+ BigInteger left = valueForToken(weightedRange.left());
while (sum.add(currentRangeWidth).compareTo(perPart) >= 0)
{
BigInteger withinRangeBoundary = perPart.subtract(sum);
@@ -155,26 +155,24 @@ public abstract class Splitter
}
boundaries.set(boundaries.size() - 1, partitioner.getMaximumToken());
- assert boundaries.size() == parts : boundaries.size() + "!=" + parts + " " + boundaries + ":" + localRanges;
+ assert boundaries.size() == parts : boundaries.size() + "!=" + parts + " " + boundaries + ":" + weightedRanges;
return boundaries;
}
- private List<Token> splitOwnedRangesNoPartialRanges(List<Range<Token>> localRanges, BigInteger perPart, int parts)
+ private List<Token> splitOwnedRangesNoPartialRanges(List<WeightedRange> weightedRanges, BigInteger perPart, int parts)
{
List<Token> boundaries = new ArrayList<>(parts);
BigInteger sum = BigInteger.ZERO;
int i = 0;
- final int rangesCount = localRanges.size();
+ final int rangesCount = weightedRanges.size();
while (boundaries.size() < parts - 1 && i < rangesCount - 1)
{
- Range<Token> r = localRanges.get(i);
- Range<Token> nextRange = localRanges.get(i + 1);
- Token right = token(r.right);
- Token nextRight = token(nextRange.right);
+ WeightedRange r = weightedRanges.get(i);
+ WeightedRange nextRange = weightedRanges.get(i + 1);
- BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left));
- BigInteger nextRangeWidth = valueForToken(nextRight).subtract(valueForToken(nextRange.left));
+ BigInteger currentRangeWidth = r.totalTokens(this);
+ BigInteger nextRangeWidth = nextRange.totalTokens(this);
sum = sum.add(currentRangeWidth);
// does this or next range take us beyond the per part limit?
@@ -187,7 +185,7 @@ public abstract class Splitter
if (diffNext.compareTo(diffCurrent) >= 0)
{
sum = BigInteger.ZERO;
- boundaries.add(right);
+ boundaries.add(token(r.right()));
}
}
i++;
@@ -256,4 +254,61 @@ public abstract class Splitter
}
return subranges;
}
+
+ public static class WeightedRange
+ {
+ private final double weight;
+ private final Range<Token> range;
+
+ public WeightedRange(double weight, Range<Token> range)
+ {
+ this.weight = weight;
+ this.range = range;
+ }
+
+ public BigInteger totalTokens(Splitter splitter)
+ {
+ BigInteger right = splitter.valueForToken(splitter.token(range.right));
+ BigInteger left = splitter.valueForToken(range.left);
+ BigInteger factor = BigInteger.valueOf(Math.max(1, (long) (1 / weight)));
+ BigInteger size = right.subtract(left);
+ return size.abs().divide(factor);
+ }
+
+ public Token left()
+ {
+ return range.left;
+ }
+
+ public Token right()
+ {
+ return range.right;
+ }
+
+ public Range<Token> range()
+ {
+ return range;
+ }
+
+ public String toString()
+ {
+ return "WeightedRange{" +
+ "weight=" + weight +
+ ", range=" + range +
+ '}';
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof WeightedRange)) return false;
+ WeightedRange that = (WeightedRange) o;
+ return Objects.equals(range, that.range);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(weight, range);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/StreamStateStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java
index e3ea838..3144e81 100644
--- a/src/java/org/apache/cassandra/dht/StreamStateStore.java
+++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java
@@ -19,38 +19,43 @@ package org.apache.cassandra.dht;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamRequest;
import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.Pair;
/**
* Store and update available ranges (data already received) to system keyspace.
*/
public class StreamStateStore implements StreamEventHandler
{
- public Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner)
+ private static final Logger logger = LoggerFactory.getLogger(StreamStateStore.class);
+
+ public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
{
return SystemKeyspace.getAvailableRanges(keyspace, partitioner);
}
/**
- * Check if given token's data is available in this node.
+ * Check if given token's data is available in this node. This doesn't handle transientness in a useful way
+ * so it's only used by a legacy test
*
* @param keyspace keyspace name
* @param token token to check
* @return true if given token in the keyspace is already streamed and ready to be served.
*/
+ @VisibleForTesting
public boolean isDataAvailable(String keyspace, Token token)
{
- Set<Range<Token>> availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
- for (Range<Token> range : availableRanges)
- {
- if (range.contains(token))
- return true;
- }
- return false;
+ RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
+ return availableRanges.ranges().stream().anyMatch(range -> range.contains(token));
}
/**
@@ -73,7 +78,7 @@ public class StreamStateStore implements StreamEventHandler
}
for (StreamRequest request : se.requests)
{
- SystemKeyspace.updateAvailableRanges(request.keyspace, request.ranges);
+ SystemKeyspace.updateAvailableRanges(request.keyspace, request.full.ranges(), request.transientReplicas.ranges());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
index efd2766..36fc8c2 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
@@ -73,7 +73,7 @@ class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
Map<Unit, UnitInfo<Unit>> unitInfos = createUnitInfos(groups);
if (groups.size() < replicas)
{
- // We need at least replicas groups to do allocation correctly. If there aren't enough,
+ // We need at least replicas groups to do allocation correctly. If there aren't enough,
// use random allocation.
// This part of the code should only be reached via the RATATest. StrategyAdapter should disallow
// token allocation in this case as the algorithm is not able to cover the behavior of NetworkTopologyStrategy.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index 61082df..ef91fbb 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -113,7 +113,7 @@ public class TokenAllocation
{
double size = current.size(next);
Token representative = current.getPartitioner().midpoint(current, next);
- for (InetAddressAndPort n : rs.calculateNaturalEndpoints(representative, tokenMetadata))
+ for (InetAddressAndPort n : rs.calculateNaturalReplicas(representative, tokenMetadata).endpoints())
{
Double v = ownership.get(n);
ownership.put(n, v != null ? v + size : size);
@@ -169,7 +169,7 @@ public class TokenAllocation
static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final SimpleStrategy rs, final InetAddressAndPort endpoint)
{
- final int replicas = rs.getReplicationFactor();
+ final int replicas = rs.getReplicationFactor().allReplicas;
return new StrategyAdapter()
{
@@ -196,7 +196,7 @@ public class TokenAllocation
static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final NetworkTopologyStrategy rs, final IEndpointSnitch snitch, final InetAddressAndPort endpoint)
{
final String dc = snitch.getDatacenter(endpoint);
- final int replicas = rs.getReplicationFactor(dc);
+ final int replicas = rs.getReplicationFactor(dc).allReplicas;
if (replicas == 0 || replicas == 1)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/exceptions/UnavailableException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/UnavailableException.java b/src/java/org/apache/cassandra/exceptions/UnavailableException.java
index 7b4edd8..d6e8488 100644
--- a/src/java/org/apache/cassandra/exceptions/UnavailableException.java
+++ b/src/java/org/apache/cassandra/exceptions/UnavailableException.java
@@ -25,14 +25,26 @@ public class UnavailableException extends RequestExecutionException
public final int required;
public final int alive;
- public UnavailableException(ConsistencyLevel consistency, int required, int alive)
+ public static UnavailableException create(ConsistencyLevel consistency, int required, int alive)
{
- this("Cannot achieve consistency level " + consistency, consistency, required, alive);
+ assert alive < required;
+ return create(consistency, required, 0, alive, 0);
}
- public UnavailableException(ConsistencyLevel consistency, String dc, int required, int alive)
+ public static UnavailableException create(ConsistencyLevel consistency, int required, int requiredFull, int alive, int aliveFull)
{
- this("Cannot achieve consistency level " + consistency + " in DC " + dc, consistency, required, alive);
+ if (required > alive)
+ return new UnavailableException("Cannot achieve consistency level " + consistency, consistency, required, alive);
+ assert requiredFull < aliveFull;
+ return new UnavailableException("Insufficient full replicas", consistency, required, alive);
+ }
+
+ public static UnavailableException create(ConsistencyLevel consistency, String dc, int required, int requiredFull, int alive, int aliveFull)
+ {
+ if (required > alive)
+ return new UnavailableException("Cannot achieve consistency level " + consistency + " in DC " + dc, consistency, required, alive);
+ assert requiredFull < aliveFull;
+ return new UnavailableException("Insufficient full replicas in DC " + dc, consistency, required, alive);
}
public UnavailableException(String msg, ConsistencyLevel consistency, int required, int alive)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 5646bf6..8546a70 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -144,6 +144,11 @@ public class EndpointState
return rpcState != null && Boolean.parseBoolean(rpcState.value);
}
+ public boolean isNormalState()
+ {
+ return getStatus().equals(VersionedValue.STATUS_NORMAL);
+ }
+
public String getStatus()
{
VersionedValue status = getApplicationState(ApplicationState.STATUS_WITH_PORT);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 0cd1278..c6ad3d9 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -20,11 +20,14 @@ package org.apache.cassandra.hints;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -39,6 +42,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
@@ -46,9 +50,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
-import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
-import static com.google.common.collect.Iterables.size;
/**
* A singleton-ish wrapper over various hints components:
@@ -151,7 +153,7 @@ public final class HintsService implements HintsServiceMBean
* @param hostIds host ids of the hint's target nodes
* @param hint the hint to store
*/
- public void write(Iterable<UUID> hostIds, Hint hint)
+ public void write(Collection<UUID> hostIds, Hint hint)
{
if (isShutDown)
throw new IllegalStateException("HintsService is shut down and can't accept new hints");
@@ -161,7 +163,7 @@ public final class HintsService implements HintsServiceMBean
bufferPool.write(hostIds, hint);
- StorageMetrics.totalHints.inc(size(hostIds));
+ StorageMetrics.totalHints.inc(hostIds.size());
}
/**
@@ -183,9 +185,14 @@ public final class HintsService implements HintsServiceMBean
String keyspaceName = hint.mutation.getKeyspaceName();
Token token = hint.mutation.key().getToken();
- Iterable<UUID> hostIds =
- transform(filter(StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token), StorageProxy::shouldHint),
- StorageService.instance::getHostIdForEndpoint);
+ EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token);
+
+ // judicious use of streams: eagerly materializing probably cheaper
+ // than performing filters / translations 2x extra via Iterables.filter/transform
+ List<UUID> hostIds = replicas.stream()
+ .filter(StorageProxy::shouldHint)
+ .map(replica -> StorageService.instance.getHostIdForEndpoint(replica.endpoint()))
+ .collect(Collectors.toList());
write(hostIds, hint);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 044a00b..4eaf1fe 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -69,13 +69,14 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
SerializationHeader header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS);
if (makeRangeAware)
- return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, formatType, 0, header);
+ return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, false, formatType, 0, header);
return SSTableTxnWriter.create(metadata,
createDescriptor(directory, metadata.keyspace, metadata.name, formatType),
0,
ActiveRepairService.UNREPAIRED_SSTABLE,
ActiveRepairService.NO_PENDING_REPAIR,
+ false,
0,
header,
Collections.emptySet());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index f2605fb..055bf24 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -23,6 +23,7 @@ import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
@@ -46,6 +47,9 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.memory.HeapAllocator;
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
/**
* This class is built on top of the SequenceFile. It stores
* data on disk in sorted fashion. However the sorting is upto
@@ -350,4 +354,13 @@ public abstract class SSTable
{
return AbstractBounds.bounds(first.getToken(), true, last.getToken(), true);
}
+
+ public static void validateRepairedMetadata(long repairedAt, UUID pendingRepair, boolean isTransient)
+ {
+ Preconditions.checkArgument((pendingRepair == NO_PENDING_REPAIR) || (repairedAt == UNREPAIRED_SSTABLE),
+ "pendingRepair cannot be set on a repaired sstable");
+ Preconditions.checkArgument(!isTransient || (pendingRepair != NO_PENDING_REPAIR),
+ "isTransient can only be true for sstables pending repair");
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 4ba0533..ec2a700 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -126,7 +126,7 @@ public class SSTableLoader implements StreamEventHandler
for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : ranges.entrySet())
{
InetAddressAndPort endpoint = entry.getKey();
- Collection<Range<Token>> tokenRanges = entry.getValue();
+ List<Range<Token>> tokenRanges = Range.normalize(entry.getValue());
List<SSTableReader.PartitionPositionBounds> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 60b8962..cfb1365 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -99,10 +99,10 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
}
@SuppressWarnings("resource") // log and writer closed during doPostCleanup
- public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int sstableLevel, SerializationHeader header)
{
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
- SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, txn);
+ SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn);
return new SSTableTxnWriter(txn, writer);
}
@@ -112,6 +112,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
SSTableFormat.Type type,
int sstableLevel,
SerializationHeader header)
@@ -122,7 +123,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
SSTableMultiWriter writer;
try
{
- writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, pendingRepair, type, sstableLevel, 0, txn, header);
+ writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, pendingRepair, isTransient, type, sstableLevel, 0, txn, header);
}
catch (IOException e)
{
@@ -140,6 +141,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
int sstableLevel,
SerializationHeader header,
Collection<Index> indexes)
@@ -147,12 +149,12 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
// if the column family store does not exist, we create a new default SSTableMultiWriter to use:
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
- SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn);
+ SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, indexes, txn);
return new SSTableTxnWriter(txn, writer);
}
- public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, UUID pendingRepair, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, SerializationHeader header)
{
- return create(cfs, desc, keyCount, repairedAt, pendingRepair, 0, header);
+ return create(cfs, desc, keyCount, repairedAt, pendingRepair, isTransient, 0, header);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index a40ec18..eb5c5fe 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -111,13 +111,14 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
- SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, indexes, txn);
+ SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, indexes, txn);
return new SimpleSSTableMultiWriter(writer, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index f289fe3..29fa573 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -44,6 +44,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
private final long estimatedKeys;
private final long repairedAt;
private final UUID pendingRepair;
+ private final boolean isTransient;
private final SSTableFormat.Type format;
private final SerializationHeader header;
private final LifecycleTransaction txn;
@@ -53,7 +54,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
private final List<SSTableReader> finishedReaders = new ArrayList<>();
private SSTableMultiWriter currentWriter = null;
- public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
+ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, boolean isTransient, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
{
DiskBoundaries db = cfs.getDiskBoundaries();
directories = db.directories;
@@ -62,6 +63,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
this.estimatedKeys = estimatedKeys / directories.size();
this.repairedAt = repairedAt;
this.pendingRepair = pendingRepair;
+ this.isTransient = isTransient;
this.format = format;
this.txn = txn;
this.header = header;
@@ -73,7 +75,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
throw new IOException(String.format("Insufficient disk space to store %s",
FBUtilities.prettyPrintMemory(totalSize)));
Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format);
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn);
+ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn);
}
}
@@ -95,7 +97,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
finishedWriters.add(currentWriter);
Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)), format);
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn);
+ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 2fade21..edb3afa 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1852,6 +1852,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return sstableMetadata.repairedAt;
}
+ public boolean isTransient()
+ {
+ return sstableMetadata.isTransient;
+ }
+
public boolean intersects(Collection<Range<Token>> ranges)
{
Bounds<Token> range = new Bounds<>(first.getToken(), last.getToken());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 1e183e2..cca59cf 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -55,6 +55,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
{
protected long repairedAt;
protected UUID pendingRepair;
+ protected boolean isTransient;
protected long maxDataAge = -1;
protected final long keyCount;
protected final MetadataCollector metadataCollector;
@@ -77,6 +78,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
@@ -86,6 +88,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.pendingRepair = pendingRepair;
+ this.isTransient = isTransient;
this.metadataCollector = metadataCollector;
this.header = header;
this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), descriptor.version, header);
@@ -96,6 +99,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
Long keyCount,
Long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
@@ -103,20 +107,21 @@ public abstract class SSTableWriter extends SSTable implements Transactional
LifecycleTransaction txn)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
+ return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
}
public static SSTableWriter create(Descriptor descriptor,
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
int sstableLevel,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
TableMetadataRef metadata = Schema.instance.getTableMetadataRef(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, indexes, txn);
+ return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, indexes, txn);
}
public static SSTableWriter create(TableMetadataRef metadata,
@@ -124,13 +129,14 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
int sstableLevel,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn);
+ return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, indexes, txn);
}
@VisibleForTesting
@@ -138,11 +144,12 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
- return create(descriptor, keyCount, repairedAt, pendingRepair, 0, header, indexes, txn);
+ return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, 0, header, indexes, txn);
}
private static Set<Component> components(TableMetadata metadata)
@@ -309,6 +316,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
metadata().params.bloomFilterFpChance,
repairedAt,
pendingRepair,
+ isTransient,
header);
}
@@ -338,6 +346,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 1d965ce..9b82c14 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -55,6 +55,8 @@ public abstract class Version
public abstract boolean hasPendingRepair();
+ public abstract boolean hasIsTransient();
+
public abstract boolean hasMetadataChecksum();
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org