You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:05 UTC
[13/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index cb2ea46..c63f4f3 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.locator;
import java.util.*;
import java.util.Map.Entry;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Datacenters;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata.Topology;
@@ -49,14 +51,17 @@ import com.google.common.collect.Multimap;
*/
public class NetworkTopologyStrategy extends AbstractReplicationStrategy
{
- private final Map<String, Integer> datacenters;
+ private final Map<String, ReplicationFactor> datacenters;
+ private final ReplicationFactor aggregateRf;
private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class);
public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException
{
super(keyspaceName, tokenMetadata, snitch, configOptions);
- Map<String, Integer> newDatacenters = new HashMap<String, Integer>();
+ int replicas = 0;
+ int trans = 0;
+ Map<String, ReplicationFactor> newDatacenters = new HashMap<>();
if (configOptions != null)
{
for (Entry<String, String> entry : configOptions.entrySet())
@@ -64,12 +69,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
String dc = entry.getKey();
if (dc.equalsIgnoreCase("replication_factor"))
throw new ConfigurationException("replication_factor is an option for SimpleStrategy, not NetworkTopologyStrategy");
- Integer replicas = Integer.valueOf(entry.getValue());
- newDatacenters.put(dc, replicas);
+ ReplicationFactor rf = ReplicationFactor.fromString(entry.getValue());
+ replicas += rf.allReplicas;
+ trans += rf.transientReplicas();
+ newDatacenters.put(dc, rf);
}
}
datacenters = Collections.unmodifiableMap(newDatacenters);
+ aggregateRf = ReplicationFactor.withTransient(replicas, trans);
logger.info("Configured datacenter replicas are {}", FBUtilities.toString(datacenters));
}
@@ -79,7 +87,8 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
private static final class DatacenterEndpoints
{
/** List accepted endpoints get pushed into. */
- Set<InetAddressAndPort> endpoints;
+ EndpointsForRange.Mutable replicas;
+
/**
* Racks encountered so far. Replicas are put into separate racks while possible.
* For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure
@@ -90,41 +99,51 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
/** Number of replicas left to fill from this DC. */
int rfLeft;
int acceptableRackRepeats;
+ int transients;
- DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddressAndPort> endpoints, Set<Pair<String, String>> racks)
+ DatacenterEndpoints(ReplicationFactor rf, int rackCount, int nodeCount, EndpointsForRange.Mutable replicas, Set<Pair<String, String>> racks)
{
- this.endpoints = endpoints;
+ this.replicas = replicas;
this.racks = racks;
// If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF.
- this.rfLeft = Math.min(rf, nodeCount);
+ this.rfLeft = Math.min(rf.allReplicas, nodeCount);
// If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack,
// and the difference is to be filled by the first encountered nodes.
- acceptableRackRepeats = rf - rackCount;
+ acceptableRackRepeats = rf.allReplicas - rackCount;
+
+ // if we have fewer replicas than rf calls for, reduce transients accordingly
+ int reduceTransients = rf.allReplicas - this.rfLeft;
+ transients = Math.max(rf.transientReplicas() - reduceTransients, 0);
+ ReplicationFactor.validate(rfLeft, transients);
}
/**
- * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful.
+ * Attempts to add an endpoint to the replicas for this datacenter, adding to the replicas set if successful.
* Returns true if the endpoint was added, and this datacenter does not require further replicas.
*/
- boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location)
+ boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location, Range<Token> replicatedRange)
{
if (done())
return false;
+ if (replicas.endpoints().contains(ep))
+ // Cannot repeat a node.
+ return false;
+
+ Replica replica = new Replica(ep, replicatedRange, rfLeft > transients);
+
if (racks.add(location))
{
// New rack.
--rfLeft;
- boolean added = endpoints.add(ep);
- assert added;
+ replicas.add(replica, Conflict.NONE);
return done();
}
if (acceptableRackRepeats <= 0)
// There must be rfLeft distinct racks left, do not add any more rack repeats.
return false;
- if (!endpoints.add(ep))
- // Cannot repeat a node.
- return false;
+
+ replicas.add(replica, Conflict.NONE);
// Added a node that is from an already met rack to match RF when there aren't enough racks.
--acceptableRackRepeats;
--rfLeft;
@@ -141,10 +160,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
/**
* calculate endpoints in one pass through the tokens by tracking our progress in each DC.
*/
- public List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata)
+ public EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata)
{
// we want to preserve insertion order so that the first added endpoint becomes primary
- Set<InetAddressAndPort> replicas = new LinkedHashSet<>();
+ ArrayList<Token> sortedTokens = tokenMetadata.sortedTokens();
+ Token replicaEnd = TokenMetadata.firstToken(sortedTokens, searchToken);
+ Token replicaStart = tokenMetadata.getPredecessor(replicaEnd);
+ Range<Token> replicatedRange = new Range<>(replicaStart, replicaEnd);
+
+ EndpointsForRange.Mutable builder = new EndpointsForRange.Mutable(replicatedRange);
Set<Pair<String, String>> seenRacks = new HashSet<>();
Topology topology = tokenMetadata.getTopology();
@@ -158,31 +182,31 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2);
// Create a DatacenterEndpoints object for each non-empty DC.
- for (Map.Entry<String, Integer> en : datacenters.entrySet())
+ for (Map.Entry<String, ReplicationFactor> en : datacenters.entrySet())
{
String dc = en.getKey();
- int rf = en.getValue();
+ ReplicationFactor rf = en.getValue();
int nodeCount = sizeOrZero(allEndpoints.get(dc));
- if (rf <= 0 || nodeCount <= 0)
+ if (rf.allReplicas <= 0 || nodeCount <= 0)
continue;
- DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, replicas, seenRacks);
+ DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, builder, seenRacks);
dcs.put(dc, dcEndpoints);
++dcsToFill;
}
- Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
+ Iterator<Token> tokenIter = TokenMetadata.ringIterator(sortedTokens, searchToken, false);
while (dcsToFill > 0 && tokenIter.hasNext())
{
Token next = tokenIter.next();
InetAddressAndPort ep = tokenMetadata.getEndpoint(next);
Pair<String, String> location = topology.getLocation(ep);
DatacenterEndpoints dcEndpoints = dcs.get(location.left);
- if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location))
+ if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location, replicatedRange))
--dcsToFill;
}
- return new ArrayList<>(replicas);
+ return builder.asImmutableView();
}
private int sizeOrZero(Multimap<?, ?> collection)
@@ -195,18 +219,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
return collection != null ? collection.size() : 0;
}
- public int getReplicationFactor()
+ public ReplicationFactor getReplicationFactor()
{
- int total = 0;
- for (int repFactor : datacenters.values())
- total += repFactor;
- return total;
+ return aggregateRf;
}
- public int getReplicationFactor(String dc)
+ public ReplicationFactor getReplicationFactor(String dc)
{
- Integer replicas = datacenters.get(dc);
- return replicas == null ? 0 : replicas;
+ ReplicationFactor replicas = datacenters.get(dc);
+ return replicas == null ? ReplicationFactor.ZERO : replicas;
}
public Set<String> getDatacenters()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
index 93e629e..449c51e 100644
--- a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
@@ -36,27 +36,32 @@ import org.apache.cassandra.dht.Token;
*/
public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
{
+ private final ReplicationFactor rf;
public OldNetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
{
super(keyspaceName, tokenMetadata, snitch, configOptions);
+ this.rf = ReplicationFactor.fromString(this.configOptions.get("replication_factor"));
}
- public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
{
- int replicas = getReplicationFactor();
- List<InetAddressAndPort> endpoints = new ArrayList<>(replicas);
ArrayList<Token> tokens = metadata.sortedTokens();
-
if (tokens.isEmpty())
- return endpoints;
+ return EndpointsForRange.empty(new Range<>(metadata.partitioner.getMinimumToken(), metadata.partitioner.getMinimumToken()));
Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
Token primaryToken = iter.next();
- endpoints.add(metadata.getEndpoint(primaryToken));
+ Token previousToken = metadata.getPredecessor(primaryToken);
+ Range<Token> tokenRange = new Range<>(previousToken, primaryToken);
+
+ EndpointsForRange.Builder replicas = EndpointsForRange.builder(tokenRange, rf.allReplicas);
+
+ assert !rf.hasTransientReplicas() : "support transient replicas";
+ replicas.add(new Replica(metadata.getEndpoint(primaryToken), previousToken, primaryToken, true));
boolean bDataCenter = false;
boolean bOtherRack = false;
- while (endpoints.size() < replicas && iter.hasNext())
+ while (replicas.size() < rf.allReplicas && iter.hasNext())
{
// First try to find one in a different data center
Token t = iter.next();
@@ -65,7 +70,7 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
// If we have already found something in a diff datacenter no need to find another
if (!bDataCenter)
{
- endpoints.add(metadata.getEndpoint(t));
+ replicas.add(new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true));
bDataCenter = true;
}
continue;
@@ -77,7 +82,7 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
// If we have already found something in a diff rack no need to find another
if (!bOtherRack)
{
- endpoints.add(metadata.getEndpoint(t));
+ replicas.add(new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true));
bOtherRack = true;
}
}
@@ -86,23 +91,24 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
// If we found N number of nodes we are good. This loop wil just exit. Otherwise just
// loop through the list and add until we have N nodes.
- if (endpoints.size() < replicas)
+ if (replicas.size() < rf.allReplicas)
{
iter = TokenMetadata.ringIterator(tokens, token, false);
- while (endpoints.size() < replicas && iter.hasNext())
+ while (replicas.size() < rf.allReplicas && iter.hasNext())
{
Token t = iter.next();
- if (!endpoints.contains(metadata.getEndpoint(t)))
- endpoints.add(metadata.getEndpoint(t));
+ Replica replica = new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true);
+ if (!replicas.containsEndpoint(replica.endpoint()))
+ replicas.add(replica);
}
}
- return endpoints;
+ return replicas.build();
}
- public int getReplicationFactor()
+ public ReplicationFactor getReplicationFactor()
{
- return Integer.parseInt(this.configOptions.get("replication_factor"));
+ return rf;
}
public void validateOptions() throws ConfigurationException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
index 92307a3..b8b7bc6 100644
--- a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
+++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
@@ -23,166 +23,147 @@ package org.apache.cassandra.locator;
import com.google.common.collect.Iterators;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
import java.util.*;
-public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddressAndPort>>>
+public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, EndpointsForRange.Mutable>>
{
- private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class);
-
/**
* We have for NavigableMap to be able to search for ranges containing a token efficiently.
*
* First two are for non-wrap-around ranges, and the last two are for wrap-around ranges.
*/
// ascendingMap will sort the ranges by the ascending order of right token
- final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap;
+ private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMap;
+
/**
* sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will
* come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap.
*/
- static final Comparator<Range<Token>> ascendingComparator = new Comparator<Range<Token>>()
- {
- @Override
- public int compare(Range<Token> o1, Range<Token> o2)
- {
- int res = o1.right.compareTo(o2.right);
- if (res != 0)
- return res;
+ private static final Comparator<Range<Token>> ascendingComparator = (o1, o2) -> {
+ int res = o1.right.compareTo(o2.right);
+ if (res != 0)
+ return res;
- return o2.left.compareTo(o1.left);
- }
- };
+ return o2.left.compareTo(o1.left);
+ };
// ascendingMap will sort the ranges by the descending order of left token
- final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap;
+ private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMap;
+
/**
* sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will
* come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap.
*/
- static final Comparator<Range<Token>> descendingComparator = new Comparator<Range<Token>>()
- {
- @Override
- public int compare(Range<Token> o1, Range<Token> o2)
- {
- int res = o2.left.compareTo(o1.left);
- if (res != 0)
- return res;
+ private static final Comparator<Range<Token>> descendingComparator = (o1, o2) -> {
+ int res = o2.left.compareTo(o1.left);
+ if (res != 0)
+ return res;
- // if left tokens are same, sort by the descending of the right tokens.
- return o2.right.compareTo(o1.right);
- }
- };
+ // if left tokens are same, sort by the descending of the right tokens.
+ return o2.right.compareTo(o1.right);
+ };
// these two maps are for warp around ranges.
- final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMapForWrapAround;
+ private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMapForWrapAround;
+
/**
* for wrap around range (begin, end], which begin > end.
* Sorting end ascending, if ends are same, sorting begin ascending,
* so that token (end, end) will come before (begin, end] with the same end, and (begin, end] will be selected in
* the tailMap.
*/
- static final Comparator<Range<Token>> ascendingComparatorForWrapAround = new Comparator<Range<Token>>()
- {
- @Override
- public int compare(Range<Token> o1, Range<Token> o2)
- {
- int res = o1.right.compareTo(o2.right);
- if (res != 0)
- return res;
+ private static final Comparator<Range<Token>> ascendingComparatorForWrapAround = (o1, o2) -> {
+ int res = o1.right.compareTo(o2.right);
+ if (res != 0)
+ return res;
- return o1.left.compareTo(o2.left);
- }
+ return o1.left.compareTo(o2.left);
};
- final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMapForWrapAround;
+ private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMapForWrapAround;
+
/**
* for wrap around ranges, which begin > end.
* Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin,
* and (begin, end) won't be selected in the tailMap.
*/
- static final Comparator<Range<Token>> descendingComparatorForWrapAround = new Comparator<Range<Token>>()
- {
- @Override
- public int compare(Range<Token> o1, Range<Token> o2)
- {
- int res = o2.left.compareTo(o1.left);
- if (res != 0)
- return res;
- return o1.right.compareTo(o2.right);
- }
+ private static final Comparator<Range<Token>> descendingComparatorForWrapAround = (o1, o2) -> {
+ int res = o2.left.compareTo(o1.left);
+ if (res != 0)
+ return res;
+ return o1.right.compareTo(o2.right);
};
public PendingRangeMaps()
{
- this.ascendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparator);
- this.descendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparator);
- this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparatorForWrapAround);
- this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparatorForWrapAround);
+ this.ascendingMap = new TreeMap<>(ascendingComparator);
+ this.descendingMap = new TreeMap<>(descendingComparator);
+ this.ascendingMapForWrapAround = new TreeMap<>(ascendingComparatorForWrapAround);
+ this.descendingMapForWrapAround = new TreeMap<>(descendingComparatorForWrapAround);
}
static final void addToMap(Range<Token> range,
- InetAddressAndPort address,
- NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap,
- NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap)
+ Replica replica,
+ NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMap,
+ NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMap)
{
- List<InetAddressAndPort> addresses = ascendingMap.get(range);
- if (addresses == null)
+ EndpointsForRange.Mutable replicas = ascendingMap.get(range);
+ if (replicas == null)
{
- addresses = new ArrayList<>(1);
- ascendingMap.put(range, addresses);
- descendingMap.put(range, addresses);
+ replicas = new EndpointsForRange.Mutable(range,1);
+ ascendingMap.put(range, replicas);
+ descendingMap.put(range, replicas);
}
- addresses.add(address);
+ replicas.add(replica, Conflict.DUPLICATE);
}
- public void addPendingRange(Range<Token> range, InetAddressAndPort address)
+ public void addPendingRange(Range<Token> range, Replica replica)
{
if (Range.isWrapAround(range.left, range.right))
{
- addToMap(range, address, ascendingMapForWrapAround, descendingMapForWrapAround);
+ addToMap(range, replica, ascendingMapForWrapAround, descendingMapForWrapAround);
}
else
{
- addToMap(range, address, ascendingMap, descendingMap);
+ addToMap(range, replica, ascendingMap, descendingMap);
}
}
- static final void addIntersections(Set<InetAddressAndPort> endpointsToAdd,
- NavigableMap<Range<Token>, List<InetAddressAndPort>> smallerMap,
- NavigableMap<Range<Token>, List<InetAddressAndPort>> biggerMap)
+ static final void addIntersections(EndpointsForToken.Builder replicasToAdd,
+ NavigableMap<Range<Token>, EndpointsForRange.Mutable> smallerMap,
+ NavigableMap<Range<Token>, EndpointsForRange.Mutable> biggerMap)
{
// find the intersection of two sets
for (Range<Token> range : smallerMap.keySet())
{
- List<InetAddressAndPort> addresses = biggerMap.get(range);
- if (addresses != null)
+ EndpointsForRange.Mutable replicas = biggerMap.get(range);
+ if (replicas != null)
{
- endpointsToAdd.addAll(addresses);
+ replicasToAdd.addAll(replicas);
}
}
}
- public Collection<InetAddressAndPort> pendingEndpointsFor(Token token)
+ public EndpointsForToken pendingEndpointsFor(Token token)
{
- Set<InetAddressAndPort> endpoints = new HashSet<>();
+ EndpointsForToken.Builder replicas = EndpointsForToken.builder(token);
- Range searchRange = new Range(token, token);
+ Range<Token> searchRange = new Range<>(token, token);
// search for non-wrap-around maps
- NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
- NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingTailMap = descendingMap.tailMap(searchRange, false);
+ NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
+ NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingTailMap = descendingMap.tailMap(searchRange, false);
// add intersections of two maps
if (ascendingTailMap.size() < descendingTailMap.size())
{
- addIntersections(endpoints, ascendingTailMap, descendingTailMap);
+ addIntersections(replicas, ascendingTailMap, descendingTailMap);
}
else
{
- addIntersections(endpoints, descendingTailMap, ascendingTailMap);
+ addIntersections(replicas, descendingTailMap, ascendingTailMap);
}
// search for wrap-around sets
@@ -190,29 +171,29 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false);
// add them since they are all necessary.
- for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : ascendingTailMap.entrySet())
+ for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : ascendingTailMap.entrySet())
{
- endpoints.addAll(entry.getValue());
+ replicas.addAll(entry.getValue());
}
- for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : descendingTailMap.entrySet())
+ for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : descendingTailMap.entrySet())
{
- endpoints.addAll(entry.getValue());
+ replicas.addAll(entry.getValue());
}
- return endpoints;
+ return replicas.build();
}
public String printPendingRanges()
{
StringBuilder sb = new StringBuilder();
- for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : this)
+ for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : this)
{
Range<Token> range = entry.getKey();
- for (InetAddressAndPort address : entry.getValue())
+ for (Replica replica : entry.getValue())
{
- sb.append(address).append(':').append(range);
+ sb.append(replica).append(':').append(range);
sb.append(System.getProperty("line.separator"));
}
}
@@ -221,7 +202,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
}
@Override
- public Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator()
+ public Iterator<Map.Entry<Range<Token>, EndpointsForRange.Mutable>> iterator()
{
return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
new file mode 100644
index 0000000..74828ad
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import static org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict.*;
+
+/**
+ * A ReplicaCollection for Ranges occurring at an endpoint. All Replica will be for the same endpoint,
+ * and must be unique Ranges (though overlapping ranges are presently permitted, these should probably not be permitted to occur)
+ */
+public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint>
+{
+ private static final Map<Range<Token>, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>());
+
+ private final InetAddressAndPort endpoint;
+ private volatile Map<Range<Token>, Replica> byRange;
+ private volatile RangesAtEndpoint fullRanges;
+ private volatile RangesAtEndpoint transRanges;
+
+ private RangesAtEndpoint(InetAddressAndPort endpoint, List<Replica> list, boolean isSnapshot)
+ {
+ this(endpoint, list, isSnapshot, null);
+ }
+ private RangesAtEndpoint(InetAddressAndPort endpoint, List<Replica> list, boolean isSnapshot, Map<Range<Token>, Replica> byRange)
+ {
+ super(list, isSnapshot);
+ this.endpoint = endpoint;
+ this.byRange = byRange;
+ assert endpoint != null;
+ }
+
+ public InetAddressAndPort endpoint()
+ {
+ return endpoint;
+ }
+
+ @Override
+ public Set<InetAddressAndPort> endpoints()
+ {
+ return Collections.unmodifiableSet(list.isEmpty()
+ ? Collections.emptySet()
+ : Collections.singleton(endpoint)
+ );
+ }
+
+ public Set<Range<Token>> ranges()
+ {
+ return byRange().keySet();
+ }
+
+ public Map<Range<Token>, Replica> byRange()
+ {
+ Map<Range<Token>, Replica> map = byRange;
+ if (map == null)
+ byRange = map = buildByRange(list);
+ return map;
+ }
+
+ @Override
+ protected RangesAtEndpoint snapshot(List<Replica> subList)
+ {
+ if (subList.isEmpty()) return empty(endpoint);
+ return new RangesAtEndpoint(endpoint, subList, true);
+ }
+
+ @Override
+ public RangesAtEndpoint self()
+ {
+ return this;
+ }
+
+ @Override
+ public ReplicaCollection.Mutable<RangesAtEndpoint> newMutable(int initialCapacity)
+ {
+ return new Mutable(endpoint, initialCapacity);
+ }
+
+ @Override
+ public boolean contains(Replica replica)
+ {
+ return replica != null
+ && Objects.equals(
+ byRange().get(replica.range()),
+ replica);
+ }
+
+ public RangesAtEndpoint full()
+ {
+ RangesAtEndpoint coll = fullRanges;
+ if (fullRanges == null)
+ fullRanges = coll = filter(Replica::isFull);
+ return coll;
+ }
+
+ public RangesAtEndpoint trans()
+ {
+ RangesAtEndpoint coll = transRanges;
+ if (transRanges == null)
+ transRanges = coll = filter(Replica::isTransient);
+ return coll;
+ }
+
+ public Collection<Range<Token>> fullRanges()
+ {
+ return full().ranges();
+ }
+
+ public Collection<Range<Token>> transientRanges()
+ {
+ return trans().ranges();
+ }
+
+ public boolean contains(Range<Token> range, boolean isFull)
+ {
+ Replica replica = byRange().get(range);
+ return replica != null && replica.isFull() == isFull;
+ }
+
+ private static Map<Range<Token>, Replica> buildByRange(List<Replica> list)
+ {
+ // TODO: implement a delegating map that uses our superclass' list, and is immutable
+ Map<Range<Token>, Replica> byRange = new LinkedHashMap<>(list.size());
+ for (Replica replica : list)
+ {
+ Replica prev = byRange.put(replica.range(), replica);
+ assert prev == null : "duplicate range in RangesAtEndpoint: " + prev + " and " + replica;
+ }
+
+ return Collections.unmodifiableMap(byRange);
+ }
+
+ public static Collector<Replica, Builder, RangesAtEndpoint> collector(InetAddressAndPort endpoint)
+ {
+ return collector(ImmutableSet.of(), () -> new Builder(endpoint));
+ }
+
+ public static class Mutable extends RangesAtEndpoint implements ReplicaCollection.Mutable<RangesAtEndpoint>
+ {
+ boolean hasSnapshot;
+ public Mutable(InetAddressAndPort endpoint) { this(endpoint, 0); }
+ public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+
+ public void add(Replica replica, Conflict ignoreConflict)
+ {
+ if (hasSnapshot) throw new IllegalStateException();
+ Preconditions.checkNotNull(replica);
+ if (!Objects.equals(super.endpoint, replica.endpoint()))
+ throw new IllegalArgumentException("Replica " + replica + " has incorrect endpoint (expected " + super.endpoint + ")");
+
+ Replica prev = super.byRange.put(replica.range(), replica);
+ if (prev != null)
+ {
+ super.byRange.put(replica.range(), prev); // restore prev
+ switch (ignoreConflict)
+ {
+ case DUPLICATE:
+ if (prev.equals(replica))
+ break;
+ case NONE:
+ throw new IllegalArgumentException("Conflicting replica added (expected unique ranges): " + replica + "; existing: " + prev);
+ case ALL:
+ }
+ return;
+ }
+
+ list.add(replica);
+ }
+
+ @Override
+ public Map<Range<Token>, Replica> byRange()
+ {
+ // our internal map is modifiable, but it is unsafe to modify the map externally
+ // it would be possible to implement a safe modifiable map, but it is probably not valuable
+ return Collections.unmodifiableMap(super.byRange());
+ }
+
+ public RangesAtEndpoint get(boolean isSnapshot)
+ {
+ return new RangesAtEndpoint(super.endpoint, super.list, isSnapshot, Collections.unmodifiableMap(super.byRange));
+ }
+
+ public RangesAtEndpoint asImmutableView()
+ {
+ return get(false);
+ }
+
+ public RangesAtEndpoint asSnapshot()
+ {
+ hasSnapshot = true;
+ return get(true);
+ }
+ }
+
+ public static class Builder extends ReplicaCollection.Builder<RangesAtEndpoint, Mutable, RangesAtEndpoint.Builder>
+ {
+ public Builder(InetAddressAndPort endpoint) { this(endpoint, 0); }
+ public Builder(InetAddressAndPort endpoint, int capacity) { super(new Mutable(endpoint, capacity)); }
+ }
+
+ public static RangesAtEndpoint.Builder builder(InetAddressAndPort endpoint)
+ {
+ return new RangesAtEndpoint.Builder(endpoint);
+ }
+
+ public static RangesAtEndpoint.Builder builder(InetAddressAndPort endpoint, int capacity)
+ {
+ return new RangesAtEndpoint.Builder(endpoint, capacity);
+ }
+
+ public static RangesAtEndpoint empty(InetAddressAndPort endpoint)
+ {
+ return new RangesAtEndpoint(endpoint, EMPTY_LIST, true, EMPTY_MAP);
+ }
+
+ public static RangesAtEndpoint of(Replica replica)
+ {
+ ArrayList<Replica> one = new ArrayList<>(1);
+ one.add(replica);
+ return new RangesAtEndpoint(replica.endpoint(), one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.range(), replica)));
+ }
+
+ public static RangesAtEndpoint of(Replica ... replicas)
+ {
+ return copyOf(Arrays.asList(replicas));
+ }
+
+ public static RangesAtEndpoint copyOf(List<Replica> replicas)
+ {
+ if (replicas.isEmpty())
+ throw new IllegalArgumentException("Must specify a non-empty collection of replicas");
+ return builder(replicas.get(0).endpoint(), replicas.size()).addAll(replicas).build();
+ }
+
+
+ /**
+ * Use of this method to synthesize Replicas is almost always wrong. In repair it turns out the concerns of transient
+ * vs non-transient are handled at a higher level, but eventually repair needs to ask streaming to actually move
+ * the data and at that point it doesn't have a great handle on what the replicas are and it doesn't really matter.
+ *
+ * Streaming expects to be given Replicas with each replica indicating what type of data (transient or not transient)
+ * should be sent.
+ *
+ * So in this one instance we can lie to streaming and pretend all the replicas are full and use a dummy address
+ * and it doesn't matter because streaming doesn't rely on the address for anything other than debugging and full
+ * is a valid value for transientness because streaming is selecting candidate tables from the repair/unrepaired
+ * set already.
+ * @param ranges
+ * @return
+ */
+ @VisibleForTesting
+ public static RangesAtEndpoint toDummyList(Collection<Range<Token>> ranges)
+ {
+ InetAddressAndPort dummy;
+ try
+ {
+ dummy = InetAddressAndPort.getByNameOverrideDefaults("0.0.0.0", 0);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ //For repair we are less concerned with full vs transient since repair is already dealing with those concerns.
+ //Always say full and then if the repair is incremental or not will determine what is streamed.
+ return ranges.stream()
+ .map(range -> new Replica(dummy, range, true))
+ .collect(collector(dummy));
+ }
+
+ /**
+ * @return concatenate two DISJOINT collections together
+ */
+ public static RangesAtEndpoint concat(RangesAtEndpoint replicas, RangesAtEndpoint extraReplicas)
+ {
+ return AbstractReplicaCollection.concat(replicas, extraReplicas, NONE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
new file mode 100644
index 0000000..698b133
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class RangesByEndpoint extends ReplicaMultimap<InetAddressAndPort, RangesAtEndpoint>
+{
+ public RangesByEndpoint(Map<InetAddressAndPort, RangesAtEndpoint> map)
+ {
+ super(map);
+ }
+
+ public RangesAtEndpoint get(InetAddressAndPort endpoint)
+ {
+ Preconditions.checkNotNull(endpoint);
+ return map.getOrDefault(endpoint, RangesAtEndpoint.empty(endpoint));
+ }
+
+ public static class Mutable extends ReplicaMultimap.Mutable<InetAddressAndPort, RangesAtEndpoint.Mutable>
+ {
+ @Override
+ protected RangesAtEndpoint.Mutable newMutable(InetAddressAndPort endpoint)
+ {
+ return new RangesAtEndpoint.Mutable(endpoint);
+ }
+
+ public RangesByEndpoint asImmutableView()
+ {
+ return new RangesByEndpoint(Collections.unmodifiableMap(Maps.transformValues(map, RangesAtEndpoint.Mutable::asImmutableView)));
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Replica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replica.java b/src/java/org/apache/cassandra/locator/Replica.java
new file mode 100644
index 0000000..37b6050
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/Replica.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A Replica represents an owning node for a copy of a portion of the token ring.
+ *
+ * It consists of:
+ * - the logical token range that is being replicated (i.e. for the first logical replica only, this will be equal
+ * to one of its owned portions of the token ring; all other replicas will have this token range also)
+ * - an endpoint (IP and port)
+ * - whether the range is replicated in full, or transiently (CASSANDRA-14404)
+ *
+ * In general, it is preferred to use a Replica to a Range<Token>, particularly when users of the concept depend on
+ * knowledge of the full/transient status of the copy.
+ *
+ * That means you should avoid unwrapping and rewrapping these things and think hard about subtraction
+ * and such and what the result is WRT to transientness. Definitely avoid creating fake Replicas with misinformation
+ * about endpoints, ranges, or transientness.
+ */
+public final class Replica implements Comparable<Replica>
+{
+ private final Range<Token> range;
+ private final InetAddressAndPort endpoint;
+ private final boolean full;
+
+ public Replica(InetAddressAndPort endpoint, Range<Token> range, boolean full)
+ {
+ Preconditions.checkNotNull(endpoint);
+ Preconditions.checkNotNull(range);
+ this.endpoint = endpoint;
+ this.range = range;
+ this.full = full;
+ }
+
+ public Replica(InetAddressAndPort endpoint, Token start, Token end, boolean full)
+ {
+ this(endpoint, new Range<>(start, end), full);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Replica replica = (Replica) o;
+ return full == replica.full &&
+ Objects.equals(endpoint, replica.endpoint) &&
+ Objects.equals(range, replica.range);
+ }
+
+ @Override
+ public int compareTo(Replica o)
+ {
+ int c = range.compareTo(o.range);
+ if (c == 0)
+ c = endpoint.compareTo(o.endpoint);
+ if (c == 0)
+ c = Boolean.compare(full, o.full);
+ return c;
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(endpoint, range, full);
+ }
+
+ @Override
+ public String toString()
+ {
+ return (full ? "Full" : "Transient") + '(' + endpoint() + ',' + range + ')';
+ }
+
+ public final InetAddressAndPort endpoint()
+ {
+ return endpoint;
+ }
+
+ public boolean isLocal()
+ {
+ return endpoint.equals(FBUtilities.getBroadcastAddressAndPort());
+ }
+
+ public Range<Token> range()
+ {
+ return range;
+ }
+
+ public boolean isFull()
+ {
+ return full;
+ }
+
+ public final boolean isTransient()
+ {
+ return !isFull();
+ }
+
+ /**
+ * This is used exclusively in TokenMetadata to check if a portion of a range is already replicated
+ * by an endpoint so that we only mark as pending the portion that is either not replicated sufficiently (transient
+ * when we need full) or at all.
+ *
+ * If it's not replicated at all it needs to be pending because there is no data.
+ * If it's replicated but only transiently and we need to replicate it fully it must be marked as pending until it
+ * is available fully otherwise a read might treat this replica as full and not read from a full replica that has
+ * the data.
+ */
+ public RangesAtEndpoint subtractSameReplication(RangesAtEndpoint toSubtract)
+ {
+ Set<Range<Token>> subtractedRanges = range().subtractAll(toSubtract.filter(r -> r.isFull() == isFull()).ranges());
+ RangesAtEndpoint.Builder result = RangesAtEndpoint.builder(endpoint, subtractedRanges.size());
+ for (Range<Token> range : subtractedRanges)
+ {
+ result.add(decorateSubrange(range));
+ }
+ return result.build();
+ }
+
+ /**
+ * Don't use this method and ignore transient status unless you are explicitly handling it outside this method.
+ *
+ * This helper method is used by StorageService.calculateStreamAndFetchRanges to perform subtraction.
+ * It ignores transient status because it's already being handled in calculateStreamAndFetchRanges.
+ */
+ public RangesAtEndpoint subtractIgnoreTransientStatus(Range<Token> subtract)
+ {
+ Set<Range<Token>> ranges = this.range.subtract(subtract);
+ RangesAtEndpoint.Builder result = RangesAtEndpoint.builder(endpoint, ranges.size());
+ for (Range<Token> subrange : ranges)
+ result.add(decorateSubrange(subrange));
+ return result.build();
+ }
+
+ public boolean contains(Range<Token> that)
+ {
+ return range().contains(that);
+ }
+
+ public boolean intersectsOnRange(Replica replica)
+ {
+ return range().intersects(replica.range());
+ }
+
+ public Replica decorateSubrange(Range<Token> subrange)
+ {
+ Preconditions.checkArgument(range.contains(subrange));
+ return new Replica(endpoint(), subrange, isFull());
+ }
+
+ public static Replica fullReplica(InetAddressAndPort endpoint, Range<Token> range)
+ {
+ return new Replica(endpoint, range, true);
+ }
+
+ public static Replica fullReplica(InetAddressAndPort endpoint, Token start, Token end)
+ {
+ return fullReplica(endpoint, new Range<>(start, end));
+ }
+
+ public static Replica transientReplica(InetAddressAndPort endpoint, Range<Token> range)
+ {
+ return new Replica(endpoint, range, false);
+ }
+
+ public static Replica transientReplica(InetAddressAndPort endpoint, Token start, Token end)
+ {
+ return transientReplica(endpoint, new Range<>(start, end));
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
new file mode 100644
index 0000000..6833f4b
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * A collection like class for Replica objects. Represents both a well defined order on the contained Replica objects,
+ * and efficient methods for accessing the contained Replicas, directly and as a projection onto their endpoints and ranges.
+ */
+public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Iterable<Replica>
+{
+ /**
+ * @return a Set of the endpoints of the contained Replicas.
+ * Iteration order is maintained where there is a 1:1 relationship between endpoint and Replica
+ * Typically this collection offers O(1) access methods, and this is true for all but ReplicaList.
+ */
+ public abstract Set<InetAddressAndPort> endpoints();
+
+ /**
+ * @param i a value in the range [0..size())
+ * @return the i'th Replica, in our iteration order
+ */
+ public abstract Replica get(int i);
+
+ /**
+ * @return the number of Replica contained
+ */
+ public abstract int size();
+
+ /**
+ * @return true iff size() == 0
+ */
+ public abstract boolean isEmpty();
+
+ /**
+ * @return true iff a Replica in this collection is equal to the provided Replica.
+ * Typically this method is expected to take O(1) time, and this is true for all but ReplicaList.
+ */
+ public abstract boolean contains(Replica replica);
+
+ /**
+ * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate.
+ * An effort will be made to either return ourself, or a subList, where possible.
+ * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
+ */
+ public abstract C filter(Predicate<Replica> predicate);
+
+ /**
+ * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate.
+ * An effort will be made to either return ourself, or a subList, where possible.
+ * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
+ * Only the first maxSize items will be returned.
+ */
+ public abstract C filter(Predicate<Replica> predicate, int maxSize);
+
+ /**
+ * @return an *eagerly constructed* copy of this collection containing the Replica at positions [start..end);
+ * An effort will be made to either return ourself, or a subList, where possible.
+ * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
+ */
+ public abstract C subList(int start, int end);
+
+ /**
+ * @return an *eagerly constructed* copy of this collection containing the Replica re-ordered according to this comparator
+ * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
+ */
+ public abstract C sorted(Comparator<Replica> comparator);
+
+ public abstract Iterator<Replica> iterator();
+ public abstract Stream<Replica> stream();
+
+ public abstract boolean equals(Object o);
+ public abstract int hashCode();
+ public abstract String toString();
+
+ /**
+ * A mutable extension of a ReplicaCollection. This is append-only, so it is safe to select a subList,
+ * or at any time take an asImmutableView() snapshot.
+ */
+ public interface Mutable<C extends ReplicaCollection<C>> extends ReplicaCollection<C>
+ {
+ /**
+ * @return an Immutable clone that mirrors any modifications to this Mutable instance.
+ */
+ C asImmutableView();
+
+ /**
+ * @return an Immutable clone that assumes this Mutable will never be modified again.
+ * If this is not true, behaviour is undefined.
+ */
+ C asSnapshot();
+
+ enum Conflict { NONE, DUPLICATE, ALL}
+
+ /**
+ * @param replica add this replica to the end of the collection
+ * @param ignoreConflict if false, fail on any conflicting additions (as defined by C's semantics)
+ */
+ void add(Replica replica, Conflict ignoreConflict);
+
+ default public void add(Replica replica)
+ {
+ add(replica, Conflict.NONE);
+ }
+
+ default public void addAll(Iterable<Replica> replicas, Conflict ignoreConflicts)
+ {
+ for (Replica replica : replicas)
+ add(replica, ignoreConflicts);
+ }
+
+ default public void addAll(Iterable<Replica> replicas)
+ {
+ addAll(replicas, Conflict.NONE);
+ }
+ }
+
+ public static class Builder<C extends ReplicaCollection<C>, M extends Mutable<C>, B extends Builder<C, M, B>>
+ {
+ Mutable<C> mutable;
+ public Builder(Mutable<C> mutable) { this.mutable = mutable; }
+
+ public int size() { return mutable.size(); }
+ public B add(Replica replica) { mutable.add(replica); return (B) this; }
+ public B add(Replica replica, Conflict ignoreConflict) { mutable.add(replica, ignoreConflict); return (B) this; }
+ public B addAll(Iterable<Replica> replica) { mutable.addAll(replica); return (B) this; }
+ public B addAll(Iterable<Replica> replica, Conflict ignoreConflict) { mutable.addAll(replica, ignoreConflict); return (B) this; }
+
+ public C build()
+ {
+ C result = mutable.asSnapshot();
+ mutable = null;
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
new file mode 100644
index 0000000..946a7f8
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.collect.Iterables.any;
+
+/**
+ * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors
+ * for building the relevant layout.
+ *
+ * Constitutes:
+ * - the 'natural' replicas replicating the range or token relevant for the operation
+ * - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates
+ * - the 'selected' replicas, those that should be targeted for any operation
+ * - 'all' replicas represents natural+pending
+ *
+ * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange)
+ * @param <L> the type of itself, including its type parameters, for return type of modifying methods
+ */
+public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+{
+ private volatile E all;
+ protected final E natural;
+ protected final E pending;
+ protected final E selected;
+
+ protected final Keyspace keyspace;
+ protected final ConsistencyLevel consistencyLevel;
+
+ private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected)
+ {
+ this(keyspace, consistencyLevel, natural, pending, selected, null);
+ }
+
+ private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all)
+ {
+ assert selected != null;
+ assert pending == null || !Endpoints.haveConflicts(natural, pending);
+ this.keyspace = keyspace;
+ this.consistencyLevel = consistencyLevel;
+ this.natural = natural;
+ this.pending = pending;
+ this.selected = selected;
+ // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural
+ if (all == null && pending == null)
+ all = natural;
+ this.all = all;
+ }
+
+ public Replica getReplicaFor(InetAddressAndPort endpoint)
+ {
+ return natural.byEndpoint().get(endpoint);
+ }
+
+ public E natural()
+ {
+ return natural;
+ }
+
+ public E all()
+ {
+ E result = all;
+ if (result == null)
+ all = result = Endpoints.concat(natural, pending);
+ return result;
+ }
+
+ public E selected()
+ {
+ return selected;
+ }
+
+ /**
+ * @return the pending replicas - will be null for read layouts
+ * TODO: ideally we would enforce at compile time that read layouts have no pending to access
+ */
+ public E pending()
+ {
+ return pending;
+ }
+
+ public int blockFor()
+ {
+ return pending == null
+ ? consistencyLevel.blockFor(keyspace)
+ : consistencyLevel.blockForWrite(keyspace, pending);
+ }
+
+ public Keyspace keyspace()
+ {
+ return keyspace;
+ }
+
+ public ConsistencyLevel consistencyLevel()
+ {
+ return consistencyLevel;
+ }
+
+ abstract public L withSelected(E replicas);
+
+ abstract public L withConsistencyLevel(ConsistencyLevel cl);
+
+ public L forNaturalUncontacted()
+ {
+ E more;
+ if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ {
+ IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch;
+ String localDC = DatabaseDescriptor.getLocalDataCenter();
+
+ more = natural.filter(replica -> !selected.contains(replica) &&
+ snitch.getDatacenter(replica).equals(localDC));
+ } else
+ {
+ more = natural.filter(replica -> !selected.contains(replica));
+ }
+
+ return withSelected(more);
+ }
+
+ public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange>
+ {
+ public final AbstractBounds<PartitionPosition> range;
+
+ @VisibleForTesting
+ public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+ {
+ // Range queries do not contact pending replicas
+ super(keyspace, consistencyLevel, natural, null, selected);
+ this.range = range;
+ }
+
+ @Override
+ public ForRange withSelected(EndpointsForRange newSelected)
+ {
+ return new ForRange(keyspace, consistencyLevel, range, natural, newSelected);
+ }
+
+ @Override
+ public ForRange withConsistencyLevel(ConsistencyLevel cl)
+ {
+ return new ForRange(keyspace, cl, range, natural, selected);
+ }
+ }
+
+ public static class ForToken extends ReplicaLayout<EndpointsForToken, ForToken>
+ {
+ public final Token token;
+
+ @VisibleForTesting
+ public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected)
+ {
+ super(keyspace, consistencyLevel, natural, pending, selected);
+ this.token = token;
+ }
+
+ public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+ {
+ super(keyspace, consistencyLevel, natural, pending, selected, all);
+ this.token = token;
+ }
+
+ public ForToken withSelected(EndpointsForToken newSelected)
+ {
+ return new ForToken(keyspace, consistencyLevel, token, natural, pending, newSelected);
+ }
+
+ @Override
+ public ForToken withConsistencyLevel(ConsistencyLevel cl)
+ {
+ return new ForToken(keyspace, cl, token, natural, pending, selected);
+ }
+ }
+
+ public static class ForPaxos extends ForToken
+ {
+ private final int requiredParticipants;
+
+ private ForPaxos(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int requiredParticipants, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+ {
+ super(keyspace, consistencyLevel, token, natural, pending, selected, all);
+ this.requiredParticipants = requiredParticipants;
+ }
+
+ public int getRequiredParticipants()
+ {
+ return requiredParticipants;
+ }
+ }
+
+ public static ForToken forSingleReplica(Keyspace keyspace, Token token, Replica replica)
+ {
+ EndpointsForToken singleReplica = EndpointsForToken.of(token, replica);
+ return new ForToken(keyspace, ConsistencyLevel.ONE, token, singleReplica, EndpointsForToken.empty(token), singleReplica, singleReplica);
+ }
+
+ public static ForRange forSingleReplica(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
+ {
+ EndpointsForRange singleReplica = EndpointsForRange.of(replica);
+ return new ForRange(keyspace, ConsistencyLevel.ONE, range, singleReplica, singleReplica);
+ }
+
+ public static ForToken forCounterWrite(Keyspace keyspace, Token token, Replica replica)
+ {
+ return forSingleReplica(keyspace, token, replica);
+ }
+
+ public static ForToken forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+ {
+ // A single case we write not for range or token, but multiple mutations to many tokens
+ Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+ EndpointsForToken natural = EndpointsForToken.copyOf(token, SystemReplicas.getSystemReplicas(endpoints));
+ EndpointsForToken pending = EndpointsForToken.empty(token);
+ ConsistencyLevel consistencyLevel = natural.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+ return forWriteWithDownNodes(keyspace, consistencyLevel, token, natural, pending);
+ }
+
+ public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token) throws UnavailableException
+ {
+ return forWrite(keyspace, consistencyLevel, token, Predicates.alwaysTrue());
+ }
+
+ public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+ {
+ EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), token);
+ EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspace.getName());
+ return forWrite(keyspace, consistencyLevel, token, natural, pending, isAlive);
+ }
+
+ public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending) throws UnavailableException
+ {
+ return forWrite(keyspace, consistencyLevel, token, natural, pending, Predicates.alwaysTrue());
+ }
+
+ public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+ {
+ if (Endpoints.haveConflicts(natural, pending))
+ {
+ natural = Endpoints.resolveConflictsInNatural(natural, pending);
+ pending = Endpoints.resolveConflictsInPending(natural, pending);
+ }
+
+ if (!any(natural, Replica::isTransient) && !any(pending, Replica::isTransient))
+ {
+ EndpointsForToken selected = Endpoints.concat(natural, pending).filter(r -> isAlive.test(r.endpoint()));
+ return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected);
+ }
+
+ return forWrite(keyspace, consistencyLevel, token, consistencyLevel.blockForWrite(keyspace, pending), natural, pending, isAlive);
+ }
+
+ public static ReplicaLayout.ForPaxos forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+ {
+ Token tk = key.getToken();
+ EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), tk);
+ EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspace.getName());
+ if (Endpoints.haveConflicts(natural, pending))
+ {
+ natural = Endpoints.resolveConflictsInNatural(natural, pending);
+ pending = Endpoints.resolveConflictsInPending(natural, pending);
+ }
+
+ // TODO CASSANDRA-14547
+ Replicas.temporaryAssertFull(natural);
+ Replicas.temporaryAssertFull(pending);
+
+ if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
+ {
+ // Restrict natural and pending to node in the local DC only
+ String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
+
+ natural = natural.filter(isLocalDc);
+ pending = pending.filter(isLocalDc);
+ }
+
+ int participants = pending.size() + natural.size();
+ int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
+
+ EndpointsForToken all = Endpoints.concat(natural, pending);
+ EndpointsForToken selected = all.filter(IAsyncCallback.isReplicaAlive);
+ if (selected.size() < requiredParticipants)
+ throw UnavailableException.create(consistencyForPaxos, requiredParticipants, selected.size());
+
+ // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
+ // Note that we fake an impossible number of required nodes in the unavailable exception
+ // to nail home the point that it's an impossible operation no matter how many nodes are live.
+ if (pending.size() > 1)
+ throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pending.size()),
+ consistencyForPaxos,
+ participants + 1,
+ selected.size());
+
+ return new ReplicaLayout.ForPaxos(keyspace, consistencyForPaxos, key.getToken(), requiredParticipants, natural, pending, selected, all);
+ }
+
+ /**
+ * We want to send mutations to as many full replicas as we can, and just as many transient replicas
+ * as we need to meet blockFor.
+ */
+ @VisibleForTesting
+ public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int blockFor, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> livePredicate) throws UnavailableException
+ {
+ EndpointsForToken all = Endpoints.concat(natural, pending);
+ EndpointsForToken selected = all
+ .select()
+ .add(r -> r.isFull() && livePredicate.test(r.endpoint()))
+ .add(r -> r.isTransient() && livePredicate.test(r.endpoint()), blockFor)
+ .get();
+
+ consistencyLevel.assureSufficientLiveNodesForWrite(keyspace, selected, pending);
+
+ return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected, all);
+ }
+
+ public static ForToken forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+ {
+ EndpointsForToken natural = StorageProxy.getLiveSortedReplicasForToken(keyspace, token);
+ EndpointsForToken selected = consistencyLevel.filterForQuery(keyspace, natural, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
+
+ // Throw UAE early if we don't have enough replicas.
+ consistencyLevel.assureSufficientLiveNodesForRead(keyspace, selected);
+
+ return new ForToken(keyspace, consistencyLevel, token, natural, null, selected);
+ }
+
+ public static ForRange forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+ {
+ return new ForRange(keyspace, consistencyLevel, range, natural, selected);
+ }
+
+ public String toString()
+ {
+ return "ReplicaLayout [ CL: " + consistencyLevel + " keyspace: " + keyspace + " natural: " + natural + "pending: " + pending + " selected: " + selected + " ]";
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaMultimap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaMultimap.java b/src/java/org/apache/cassandra/locator/ReplicaMultimap.java
new file mode 100644
index 0000000..3e3fcb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaMultimap.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+public abstract class ReplicaMultimap<K, C extends ReplicaCollection<?>>
+{
+ final Map<K, C> map;
+ ReplicaMultimap(Map<K, C> map)
+ {
+ this.map = map;
+ }
+
+ public abstract C get(K key);
+ public C getIfPresent(K key) { return map.get(key); }
+
+ public static abstract class Mutable
+ <K, MutableCollection extends ReplicaCollection.Mutable<?>>
+ extends ReplicaMultimap<K, MutableCollection>
+ {
+ protected abstract MutableCollection newMutable(K key);
+
+ Mutable()
+ {
+ super(new HashMap<>());
+ }
+
+ public MutableCollection get(K key)
+ {
+ Preconditions.checkNotNull(key);
+ return map.computeIfAbsent(key, k -> newMutable(key));
+ }
+
+ public void put(K key, Replica replica)
+ {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(replica);
+ get(key).add(replica);
+ }
+ }
+
+ public Iterable<Replica> flattenValues()
+ {
+ return Iterables.concat(map.values());
+ }
+
+ public Iterable<Map.Entry<K, Replica>> flattenEntries()
+ {
+ return () -> {
+ Stream<Map.Entry<K, Replica>> s = map.entrySet()
+ .stream()
+ .flatMap(entry -> entry.getValue()
+ .stream()
+ .map(replica -> (Map.Entry<K, Replica>)new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), replica)));
+ return s.iterator();
+ };
+ }
+
+ public boolean isEmpty()
+ {
+ return map.isEmpty();
+ }
+
+ public boolean containsKey(Object key)
+ {
+ return map.containsKey(key);
+ }
+
+ public Set<K> keySet()
+ {
+ return map.keySet();
+ }
+
+ public Set<Map.Entry<K, C>> entrySet()
+ {
+ return map.entrySet();
+ }
+
+ public Map<K, C> asMap()
+ {
+ return map;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ReplicaMultimap<?, ?> that = (ReplicaMultimap<?, ?>) o;
+ return Objects.equals(map, that.map);
+ }
+
+ public int hashCode()
+ {
+ return map.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return map.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Replicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replicas.java b/src/java/org/apache/cassandra/locator/Replicas.java
new file mode 100644
index 0000000..299e6ec
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/Replicas.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import static com.google.common.collect.Iterables.all;
+
+public class Replicas
+{
+
+ public static int countFull(ReplicaCollection<?> liveReplicas)
+ {
+ int count = 0;
+ for (Replica replica : liveReplicas)
+ if (replica.isFull())
+ ++count;
+ return count;
+ }
+
+ /**
+ * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future
+ */
+ public static void temporaryAssertFull(Replica replica)
+ {
+ if (!replica.isFull())
+ {
+ throw new UnsupportedOperationException("transient replicas are currently unsupported: " + replica);
+ }
+ }
+
+ /**
+ * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future
+ */
+ public static void temporaryAssertFull(Iterable<Replica> replicas)
+ {
+ if (!all(replicas, Replica::isFull))
+ {
+ throw new UnsupportedOperationException("transient replicas are currently unsupported: " + Iterables.toString(replicas));
+ }
+ }
+
+ /**
+ * For areas of the code that should never see a transient replica
+ */
+ public static void assertFull(Iterable<Replica> replicas)
+ {
+ if (!all(replicas, Replica::isFull))
+ {
+ throw new UnsupportedOperationException("transient replicas are currently unsupported: " + Iterables.toString(replicas));
+ }
+ }
+
+ public static List<String> stringify(ReplicaCollection<?> replicas, boolean withPort)
+ {
+ List<String> stringEndpoints = new ArrayList<>(replicas.size());
+ for (Replica replica: replicas)
+ {
+ stringEndpoints.add(replica.endpoint().getHostAddress(withPort));
+ }
+ return stringEndpoints;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicationFactor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicationFactor.java b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
new file mode 100644
index 0000000..c0ed31f
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ReplicationFactor
+{
+ public static final ReplicationFactor ZERO = new ReplicationFactor(0);
+
+ public final int allReplicas;
+ public final int fullReplicas;
+
+ private ReplicationFactor(int allReplicas, int transientReplicas)
+ {
+ validate(allReplicas, transientReplicas);
+ this.allReplicas = allReplicas;
+ this.fullReplicas = allReplicas - transientReplicas;
+ }
+
+ public int transientReplicas()
+ {
+ return allReplicas - fullReplicas;
+ }
+
+ public boolean hasTransientReplicas()
+ {
+ return allReplicas != fullReplicas;
+ }
+
+ private ReplicationFactor(int allReplicas)
+ {
+ this(allReplicas, 0);
+ }
+
+ static void validate(int totalRF, int transientRF)
+ {
+ Preconditions.checkArgument(transientRF == 0 || DatabaseDescriptor.isTransientReplicationEnabled(),
+ "Transient replication is not enabled on this node");
+ Preconditions.checkArgument(totalRF >= 0,
+ "Replication factor must be non-negative, found %s", totalRF);
+ Preconditions.checkArgument(transientRF == 0 || transientRF < totalRF,
+ "Transient replicas must be zero, or less than total replication factor. For %s/%s", totalRF, transientRF);
+ if (transientRF > 0)
+ {
+ Preconditions.checkArgument(DatabaseDescriptor.getNumTokens() == 1,
+ "Transient nodes are not allowed with multiple tokens");
+ Stream<InetAddressAndPort> endpoints = Stream.concat(Gossiper.instance.getLiveMembers().stream(), Gossiper.instance.getUnreachableMembers().stream());
+ List<InetAddressAndPort> badVersionEndpoints = endpoints.filter(Predicates.not(FBUtilities.getBroadcastAddressAndPort()::equals))
+ .filter(endpoint -> Gossiper.instance.getReleaseVersion(endpoint) != null && Gossiper.instance.getReleaseVersion(endpoint).major < 4)
+ .collect(Collectors.toList());
+ if (!badVersionEndpoints.isEmpty())
+ throw new AssertionError("Transient replication is not supported in mixed version clusters with nodes < 4.0. Bad nodes: " + badVersionEndpoints);
+ }
+ else if (transientRF < 0)
+ {
+ throw new AssertionError(String.format("Amount of transient nodes should be strictly positive, but was: '%d'", transientRF));
+ }
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ReplicationFactor that = (ReplicationFactor) o;
+ return allReplicas == that.allReplicas && fullReplicas == that.fullReplicas;
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(allReplicas, fullReplicas);
+ }
+
+ public static ReplicationFactor fullOnly(int totalReplicas)
+ {
+ return new ReplicationFactor(totalReplicas);
+ }
+
+ public static ReplicationFactor withTransient(int totalReplicas, int transientReplicas)
+ {
+ return new ReplicationFactor(totalReplicas, transientReplicas);
+ }
+
+ public static ReplicationFactor fromString(String s)
+ {
+ if (s.contains("/"))
+ {
+ String[] parts = s.split("/");
+ Preconditions.checkArgument(parts.length == 2,
+ "Replication factor format is <replicas> or <replicas>/<transient>");
+ return new ReplicationFactor(Integer.valueOf(parts[0]), Integer.valueOf(parts[1]));
+ }
+ else
+ {
+ return new ReplicationFactor(Integer.valueOf(s), 0);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "rf(" + allReplicas + (hasTransientReplicas() ? '/' + transientReplicas() : "") + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SimpleSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleSnitch.java b/src/java/org/apache/cassandra/locator/SimpleSnitch.java
index e31fc6b..d605b6e 100644
--- a/src/java/org/apache/cassandra/locator/SimpleSnitch.java
+++ b/src/java/org/apache/cassandra/locator/SimpleSnitch.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.util.List;
-
/**
* A simple endpoint snitch implementation that treats Strategy order as proximity,
* allowing non-read-repaired reads to prefer a single endpoint, which improves
@@ -37,12 +35,14 @@ public class SimpleSnitch extends AbstractEndpointSnitch
}
@Override
- public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+ public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddress)
{
// Optimization to avoid walking the list
+ return unsortedAddress;
}
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+ @Override
+ public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2)
{
// Making all endpoints equal ensures we won't change the original ordering (since
// Collections.sort is guaranteed to be stable)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org