You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:06 UTC

[14/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index db73b4f..8eb8603 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -21,6 +21,9 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -85,13 +88,15 @@ public class BigFormat implements SSTableFormat
                                   long keyCount,
                                   long repairedAt,
                                   UUID pendingRepair,
+                                  boolean isTransient,
                                   TableMetadataRef metadata,
                                   MetadataCollector metadataCollector,
                                   SerializationHeader header,
                                   Collection<SSTableFlushObserver> observers,
                                   LifecycleTransaction txn)
         {
-            return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers, txn);
+            SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient);
+            return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers, txn);
         }
     }
 
@@ -120,7 +125,7 @@ public class BigFormat implements SSTableFormat
         // mb (3.0.7, 3.7): commit log lower bound included
         // mc (3.0.8, 3.9): commit log intervals included
 
-        // na (4.0.0): uncompressed chunks, pending repair session, checksummed sstable metadata file, new Bloomfilter format
+        // na (4.0.0): uncompressed chunks, pending repair session, isTransient, checksummed sstable metadata file, new Bloomfilter format
         //
         // NOTE: when adding a new version, please add that to LegacySSTableTest, too.
 
@@ -131,6 +136,7 @@ public class BigFormat implements SSTableFormat
         public final boolean hasMaxCompressedLength;
         private final boolean hasPendingRepair;
         private final boolean hasMetadataChecksum;
+        private final boolean hasIsTransient;
         /**
          * CASSANDRA-9067: 4.0 bloom filter representation changed (two longs just swapped)
          * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution.
@@ -148,6 +154,7 @@ public class BigFormat implements SSTableFormat
             hasCommitLogIntervals = version.compareTo("mc") >= 0;
             hasMaxCompressedLength = version.compareTo("na") >= 0;
             hasPendingRepair = version.compareTo("na") >= 0;
+            hasIsTransient = version.compareTo("na") >= 0;
             hasMetadataChecksum = version.compareTo("na") >= 0;
             hasOldBfFormat = version.compareTo("na") < 0;
         }
@@ -176,6 +183,12 @@ public class BigFormat implements SSTableFormat
         }
 
         @Override
+        public boolean hasIsTransient()
+        {
+            return hasIsTransient;
+        }
+
+        @Override
         public int correspondingMessagingVersion()
         {
             return correspondingMessagingVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index b5488ed..7513e95 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -68,13 +68,14 @@ public class BigTableWriter extends SSTableWriter
                           long keyCount,
                           long repairedAt,
                           UUID pendingRepair,
+                          boolean isTransient,
                           TableMetadataRef metadata,
                           MetadataCollector metadataCollector, 
                           SerializationHeader header,
                           Collection<SSTableFlushObserver> observers,
                           LifecycleTransaction txn)
     {
-        super(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers);
+        super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers);
         txn.trackNew(this); // must track before any files are created
 
         if (compression)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index 6a40d94..eb7b2c7 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -71,7 +71,7 @@ public interface IMetadataSerializer
     void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
 
     /**
-     * Mutate the repairedAt time and pendingRepair ID
+     * Mutate the repairedAt time, pendingRepair ID, and transient status
      */
-    void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException;
+    public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 9d9c1a8..36c218b 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -83,7 +83,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
                                  ActiveRepairService.UNREPAIRED_SSTABLE,
                                  -1,
                                  -1,
-                                 null);
+                                 null,
+                                 false);
     }
 
     protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
@@ -272,7 +273,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
         this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
     }
 
-    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, SerializationHeader header)
+    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, boolean isTransient, SerializationHeader header)
     {
         Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
         components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
@@ -294,7 +295,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
                                                              repairedAt,
                                                              totalColumnsSet,
                                                              totalRows,
-                                                             pendingRepair));
+                                                             pendingRepair,
+                                                             isTransient));
         components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality));
         components.put(MetadataType.HEADER, header.toComponent());
         return components;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 74923a0..f76db2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -230,7 +230,7 @@ public class MetadataSerializer implements IMetadataSerializer
         rewriteSSTableMetadata(descriptor, currentComponents);
     }
 
-    public void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException
+    public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException
     {
         if (logger.isTraceEnabled())
             logger.trace("Mutating {} to repairedAt time {} and pendingRepair {}",
@@ -238,7 +238,7 @@ public class MetadataSerializer implements IMetadataSerializer
         Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
         StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
         // mutate time & id
-        currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt).mutatePendingRepair(newPendingRepair));
+        currentComponents.put(MetadataType.STATS, stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient));
         rewriteSSTableMetadata(descriptor, currentComponents);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 2b8ebef..f14fb5d 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -64,6 +64,7 @@ public class StatsMetadata extends MetadataComponent
     public final long totalColumnsSet;
     public final long totalRows;
     public final UUID pendingRepair;
+    public final boolean isTransient;
 
     public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
                          EstimatedHistogram estimatedColumnCount,
@@ -83,7 +84,8 @@ public class StatsMetadata extends MetadataComponent
                          long repairedAt,
                          long totalColumnsSet,
                          long totalRows,
-                         UUID pendingRepair)
+                         UUID pendingRepair,
+                         boolean isTransient)
     {
         this.estimatedPartitionSize = estimatedPartitionSize;
         this.estimatedColumnCount = estimatedColumnCount;
@@ -104,6 +106,7 @@ public class StatsMetadata extends MetadataComponent
         this.totalColumnsSet = totalColumnsSet;
         this.totalRows = totalRows;
         this.pendingRepair = pendingRepair;
+        this.isTransient = isTransient;
     }
 
     public MetadataType getType()
@@ -155,10 +158,11 @@ public class StatsMetadata extends MetadataComponent
                                  repairedAt,
                                  totalColumnsSet,
                                  totalRows,
-                                 pendingRepair);
+                                 pendingRepair,
+                                 isTransient);
     }
 
-    public StatsMetadata mutateRepairedAt(long newRepairedAt)
+    public StatsMetadata mutateRepairedMetadata(long newRepairedAt, UUID newPendingRepair, boolean newIsTransient)
     {
         return new StatsMetadata(estimatedPartitionSize,
                                  estimatedColumnCount,
@@ -178,30 +182,8 @@ public class StatsMetadata extends MetadataComponent
                                  newRepairedAt,
                                  totalColumnsSet,
                                  totalRows,
-                                 pendingRepair);
-    }
-
-    public StatsMetadata mutatePendingRepair(UUID newPendingRepair)
-    {
-        return new StatsMetadata(estimatedPartitionSize,
-                                 estimatedColumnCount,
-                                 commitLogIntervals,
-                                 minTimestamp,
-                                 maxTimestamp,
-                                 minLocalDeletionTime,
-                                 maxLocalDeletionTime,
-                                 minTTL,
-                                 maxTTL,
-                                 compressionRatio,
-                                 estimatedTombstoneDropTime,
-                                 sstableLevel,
-                                 minClusteringValues,
-                                 maxClusteringValues,
-                                 hasLegacyCounterShards,
-                                 repairedAt,
-                                 totalColumnsSet,
-                                 totalRows,
-                                 newPendingRepair);
+                                 newPendingRepair,
+                                 newIsTransient);
     }
 
     @Override
@@ -292,6 +274,12 @@ public class StatsMetadata extends MetadataComponent
                 if (component.pendingRepair != null)
                     size += UUIDSerializer.serializer.serializedSize(component.pendingRepair, 0);
             }
+
+            if (version.hasIsTransient())
+            {
+                size += TypeSizes.sizeof(component.isTransient);
+            }
+
             return size;
         }
 
@@ -338,6 +326,11 @@ public class StatsMetadata extends MetadataComponent
                     out.writeByte(0);
                 }
             }
+
+            if (version.hasIsTransient())
+            {
+                out.writeBoolean(component.isTransient);
+            }
         }
 
         public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
@@ -386,6 +379,8 @@ public class StatsMetadata extends MetadataComponent
                 pendingRepair = UUIDSerializer.serializer.deserialize(in, 0);
             }
 
+            boolean isTransient = version.hasIsTransient() && in.readBoolean();
+
             return new StatsMetadata(partitionSizes,
                                      columnCounts,
                                      commitLogIntervals,
@@ -404,7 +399,8 @@ public class StatsMetadata extends MetadataComponent
                                      repairedAt,
                                      totalColumnsSet,
                                      totalRows,
-                                     pendingRepair);
+                                     pendingRepair,
+                                     isTransient);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
index 2ee8eea..2e7408b 100644
--- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
@@ -17,13 +17,12 @@
  */
 package org.apache.cassandra.locator;
 
-import java.util.*;
-
+import com.google.common.collect.Iterables;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 public abstract class AbstractEndpointSnitch implements IEndpointSnitch
 {
-    public abstract int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2);
+    public abstract int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2);
 
     /**
      * Sorts the <tt>Collection</tt> of node addresses by proximity to the given address
@@ -31,27 +30,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
      * @param unsortedAddress the nodes to sort
      * @return a new sorted <tt>List</tt>
      */
-    public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress)
-    {
-        List<InetAddressAndPort> preferred = new ArrayList<>(unsortedAddress);
-        sortByProximity(address, preferred);
-        return preferred;
-    }
-
-    /**
-     * Sorts the <tt>List</tt> of node addresses, in-place, by proximity to the given address
-     * @param address the address to sort the proximity by
-     * @param addresses the nodes to sort
-     */
-    public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+    public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddress)
     {
-        Collections.sort(addresses, new Comparator<InetAddressAndPort>()
-        {
-            public int compare(InetAddressAndPort a1, InetAddressAndPort a2)
-            {
-                return compareEndpoints(address, a1, a2);
-            }
-        });
+        return unsortedAddress.sorted((r1, r2) -> compareEndpoints(address, r1, r2));
     }
 
     public void gossiperStarting()
@@ -59,7 +40,7 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
         // noop by default
     }
 
-    public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
+    public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2)
     {
         // Querying remote DC is likely to be an order of magnitude slower than
         // querying locally, so 2 queries to local nodes is likely to still be
@@ -70,14 +51,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
              : true;
     }
 
-    private boolean hasRemoteNode(List<InetAddressAndPort> l)
+    private boolean hasRemoteNode(ReplicaCollection<?> l)
     {
         String localDc = DatabaseDescriptor.getLocalDataCenter();
-        for (InetAddressAndPort ep : l)
-        {
-            if (!localDc.equals(getDatacenter(ep)))
-                return true;
-        }
-        return false;
+        return Iterables.any(l, replica -> !localDc.equals(getDatacenter(replica)));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
index e91f6ac..08c41f0 100644
--- a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
@@ -37,8 +37,11 @@ public abstract class AbstractNetworkTopologySnitch extends AbstractEndpointSnit
      */
     abstract public String getDatacenter(InetAddressAndPort endpoint);
 
-    public int compareEndpoints(InetAddressAndPort address, InetAddressAndPort a1, InetAddressAndPort a2)
+    @Override
+    public int compareEndpoints(InetAddressAndPort address, Replica r1, Replica r2)
     {
+        InetAddressAndPort a1 = r1.endpoint();
+        InetAddressAndPort a2 = r2.endpoint();
         if (address.equals(a1) && !address.equals(a2))
             return -1;
         if (address.equals(a2) && !address.equals(a1))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
new file mode 100644
index 0000000..ecf1296
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+
+/**
+ * A collection like class for Replica objects. Since the Replica class contains inetaddress, range, and
+ * transient replication status, basic contains and remove methods can be ambiguous. Replicas forces you
+ * to be explicit about what you're checking the container for, or removing from it.
+ */
+public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollection<C>> implements ReplicaCollection<C>
+{
+    protected static final List<Replica> EMPTY_LIST = new ArrayList<>(); // since immutable, can safely return this to avoid megamorphic callsites
+
+    public static <C extends ReplicaCollection<C>, B extends Builder<C, ?, B>> Collector<Replica, B, C> collector(Set<Collector.Characteristics> characteristics, Supplier<B> supplier)
+    {
+        return new Collector<Replica, B, C>()
+        {
+            private final BiConsumer<B, Replica> accumulator = Builder::add;
+            private final BinaryOperator<B> combiner = (a, b) -> { a.addAll(b.mutable); return a; };
+            private final Function<B, C> finisher = Builder::build;
+            public Supplier<B> supplier() { return supplier; }
+            public BiConsumer<B, Replica> accumulator() { return accumulator; }
+            public BinaryOperator<B> combiner() { return combiner; }
+            public Function<B, C> finisher() { return finisher; }
+            public Set<Characteristics> characteristics() { return characteristics; }
+        };
+    }
+
+    protected final List<Replica> list;
+    protected final boolean isSnapshot;
+    protected AbstractReplicaCollection(List<Replica> list, boolean isSnapshot)
+    {
+        this.list = list;
+        this.isSnapshot = isSnapshot;
+    }
+
+    // if subList == null, should return self (or a clone thereof)
+    protected abstract C snapshot(List<Replica> subList);
+    protected abstract C self();
+    /**
+     * construct a new Mutable of our own type, so that we can concatenate
+     * TODO: this isn't terribly pretty, but we need sometimes to select / merge two Endpoints of unknown type;
+     */
+    public abstract Mutable<C> newMutable(int initialCapacity);
+
+
+    public C snapshot()
+    {
+        return isSnapshot ? self()
+                          : snapshot(list.isEmpty() ? EMPTY_LIST
+                                                    : new ArrayList<>(list));
+    }
+
+    public final C subList(int start, int end)
+    {
+        List<Replica> subList;
+        if (isSnapshot)
+        {
+            if (start == 0 && end == size()) return self();
+            else if (start == end) subList = EMPTY_LIST;
+            else subList = list.subList(start, end);
+        }
+        else
+        {
+            if (start == end) subList = EMPTY_LIST;
+            else subList = new ArrayList<>(list.subList(start, end)); // TODO: we could take a subList here, but comodification checks stop us
+        }
+        return snapshot(subList);
+    }
+
+    public final C filter(Predicate<Replica> predicate)
+    {
+        return filter(predicate, Integer.MAX_VALUE);
+    }
+
+    public final C filter(Predicate<Replica> predicate, int limit)
+    {
+        if (isEmpty())
+            return snapshot();
+
+        List<Replica> copy = null;
+        int beginRun = -1, endRun = -1;
+        int i = 0;
+        for (; i < list.size() ; ++i)
+        {
+            Replica replica = list.get(i);
+            if (predicate.test(replica))
+            {
+                if (copy != null)
+                    copy.add(replica);
+                else if (beginRun < 0)
+                    beginRun = i;
+                else if (endRun > 0)
+                {
+                    copy = new ArrayList<>(Math.min(limit, (list.size() - i) + (endRun - beginRun)));
+                    for (int j = beginRun ; j < endRun ; ++j)
+                        copy.add(list.get(j));
+                    copy.add(list.get(i));
+                }
+                if (--limit == 0)
+                {
+                    ++i;
+                    break;
+                }
+            }
+            else if (beginRun >= 0 && endRun < 0)
+                endRun = i;
+        }
+
+        if (beginRun < 0)
+            beginRun = endRun = 0;
+        if (endRun < 0)
+            endRun = i;
+        if (copy == null)
+            return subList(beginRun, endRun);
+        return snapshot(copy);
+    }
+
+    public final class Select
+    {
+        private final List<Replica> result;
+        public Select(int expectedSize)
+        {
+            this.result = new ArrayList<>(expectedSize);
+        }
+
+        /**
+         * Add matching replica to the result; this predicate should be mutually exclusive with all prior predicates.
+         * Stop once we have targetSize replicas in total, including preceding calls
+         */
+        public Select add(Predicate<Replica> predicate, int targetSize)
+        {
+            assert !Iterables.any(result, predicate::test);
+            for (int i = 0 ; result.size() < targetSize && i < list.size() ; ++i)
+                if (predicate.test(list.get(i)))
+                    result.add(list.get(i));
+            return this;
+        }
+        public Select add(Predicate<Replica> predicate)
+        {
+            return add(predicate, Integer.MAX_VALUE);
+        }
+        public C get()
+        {
+            return snapshot(result);
+        }
+    }
+
+    /**
+     * An efficient method for selecting a subset of replica via a sequence of filters.
+     *
+     * Example: select().add(filter1).add(filter2, 3).get();
+     *
+     * @return a Select object
+     */
+    public final Select select()
+    {
+        return select(list.size());
+    }
+    public final Select select(int expectedSize)
+    {
+        return new Select(expectedSize);
+    }
+
+    public final C sorted(Comparator<Replica> comparator)
+    {
+        List<Replica> copy = new ArrayList<>(list);
+        copy.sort(comparator);
+        return snapshot(copy);
+    }
+
+    public final Replica get(int i)
+    {
+        return list.get(i);
+    }
+
+    public final int size()
+    {
+        return list.size();
+    }
+
+    public final boolean isEmpty()
+    {
+        return list.isEmpty();
+    }
+
+    public final Iterator<Replica> iterator()
+    {
+        return list.iterator();
+    }
+
+    public final Stream<Replica> stream() { return list.stream(); }
+
+    public final boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof AbstractReplicaCollection<?>))
+        {
+            if (!(o instanceof ReplicaCollection<?>))
+                return false;
+
+            ReplicaCollection<?> that = (ReplicaCollection<?>) o;
+            return Iterables.elementsEqual(this, that);
+        }
+        AbstractReplicaCollection<?> that = (AbstractReplicaCollection<?>) o;
+        return Objects.equals(list, that.list);
+    }
+
+    public final int hashCode()
+    {
+        return list.hashCode();
+    }
+
+    @Override
+    public final String toString()
+    {
+        return list.toString();
+    }
+
+    static <C extends AbstractReplicaCollection<C>> C concat(C replicas, C extraReplicas, Mutable.Conflict ignoreConflicts)
+    {
+        if (extraReplicas.isEmpty())
+            return replicas;
+        if (replicas.isEmpty())
+            return extraReplicas;
+        Mutable<C> mutable = replicas.newMutable(replicas.size() + extraReplicas.size());
+        mutable.addAll(replicas);
+        mutable.addAll(extraReplicas, ignoreConflicts);
+        return mutable.asImmutableView();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 3e9b5bb..0ddc0a4 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -22,8 +22,8 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,9 +73,9 @@ public abstract class AbstractReplicationStrategy
         // lazy-initialize keyspace itself since we don't create them until after the replication strategies
     }
 
-    private final Map<Token, ArrayList<InetAddressAndPort>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddressAndPort>>();
+    private final Map<Token, EndpointsForRange> cachedReplicas = new NonBlockingHashMap<>();
 
-    public ArrayList<InetAddressAndPort> getCachedEndpoints(Token t)
+    public EndpointsForRange getCachedReplicas(Token t)
     {
         long lastVersion = tokenMetadata.getRingVersion();
 
@@ -86,13 +86,13 @@ public abstract class AbstractReplicationStrategy
                 if (lastVersion > lastInvalidatedVersion)
                 {
                     logger.trace("clearing cached endpoints");
-                    cachedEndpoints.clear();
+                    cachedReplicas.clear();
                     lastInvalidatedVersion = lastVersion;
                 }
             }
         }
 
-        return cachedEndpoints.get(t);
+        return cachedReplicas.get(t);
     }
 
     /**
@@ -102,64 +102,65 @@ public abstract class AbstractReplicationStrategy
      * @param searchPosition the position the natural endpoints are requested for
      * @return a copy of the natural endpoints for the given token
      */
-    public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition)
+    public EndpointsForToken getNaturalReplicasForToken(RingPosition searchPosition)
+    {
+        return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken());
+    }
+
+    public EndpointsForRange getNaturalReplicas(RingPosition searchPosition)
     {
         Token searchToken = searchPosition.getToken();
         Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
-        ArrayList<InetAddressAndPort> endpoints = getCachedEndpoints(keyToken);
+        EndpointsForRange endpoints = getCachedReplicas(keyToken);
         if (endpoints == null)
         {
             TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
             // if our cache got invalidated, it's possible there is a new token to account for too
             keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
-            endpoints = new ArrayList<InetAddressAndPort>(calculateNaturalEndpoints(searchToken, tm));
-            cachedEndpoints.put(keyToken, endpoints);
+            endpoints = calculateNaturalReplicas(searchToken, tm);
+            cachedReplicas.put(keyToken, endpoints);
         }
 
-        return new ArrayList<InetAddressAndPort>(endpoints);
+        return endpoints;
     }
 
     /**
      * calculate the natural endpoints for the given token
      *
-     * @see #getNaturalEndpoints(org.apache.cassandra.dht.RingPosition)
+     * @see #getNaturalReplicasForToken(org.apache.cassandra.dht.RingPosition)
      *
      * @param searchToken the token the natural endpoints are requested for
      * @return a copy of the natural endpoints for the given token
      */
-    public abstract List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
+    public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata);
 
-    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints,
-                                                                       Collection<InetAddressAndPort> pendingEndpoints,
-                                                                       ConsistencyLevel consistency_level,
+    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
                                                                        Runnable callback,
                                                                        WriteType writeType,
                                                                        long queryStartNanoTime)
     {
-        return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
+        return getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
     }
 
-    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints,
-                                                                       Collection<InetAddressAndPort> pendingEndpoints,
-                                                                       ConsistencyLevel consistency_level,
+    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
                                                                        Runnable callback,
                                                                        WriteType writeType,
                                                                        long queryStartNanoTime,
                                                                        ConsistencyLevel idealConsistencyLevel)
     {
         AbstractWriteResponseHandler resultResponseHandler;
-        if (consistency_level.isDatacenterLocal())
+        if (replicaLayout.consistencyLevel.isDatacenterLocal())
         {
             // block for in this context will be localnodes block.
-            resultResponseHandler = new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
+            resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
         }
-        else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
+        else if (replicaLayout.consistencyLevel == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
         {
-            resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
+            resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
         }
         else
         {
-            resultResponseHandler = new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
+            resultResponseHandler = new WriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
         }
 
         //Check if tracking the ideal consistency level is configured
@@ -168,16 +169,14 @@ public abstract class AbstractReplicationStrategy
             //If ideal and requested are the same just use this handler to track the ideal consistency level
             //This is also used so that the ideal consistency level handler when constructed knows it is the ideal
             //one for tracking purposes
-            if (idealConsistencyLevel == consistency_level)
+            if (idealConsistencyLevel == replicaLayout.consistencyLevel)
             {
                 resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler);
             }
             else
             {
                 //Construct a delegate response handler to use to track the ideal consistency level
-                AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(naturalEndpoints,
-                                                                                    pendingEndpoints,
-                                                                                    idealConsistencyLevel,
+                AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel),
                                                                                     callback,
                                                                                     writeType,
                                                                                     queryStartNanoTime,
@@ -202,7 +201,12 @@ public abstract class AbstractReplicationStrategy
      *
      * @return the replication factor
      */
-    public abstract int getReplicationFactor();
+    public abstract ReplicationFactor getReplicationFactor();
+
+    public boolean hasTransientReplicas()
+    {
+        return getReplicationFactor().hasTransientReplicas();
+    }
 
     /*
      * NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below.
@@ -210,53 +214,81 @@ public abstract class AbstractReplicationStrategy
      * (fixing this would probably require merging tokenmetadata into replicationstrategy,
      * so we could cache/invalidate cleanly.)
      */
-    public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges(TokenMetadata metadata)
+    public RangesByEndpoint getAddressReplicas(TokenMetadata metadata)
     {
-        Multimap<InetAddressAndPort, Range<Token>> map = HashMultimap.create();
+        RangesByEndpoint.Mutable map = new RangesByEndpoint.Mutable();
 
         for (Token token : metadata.sortedTokens())
         {
             Range<Token> range = metadata.getPrimaryRangeFor(token);
-            for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata))
+            for (Replica replica : calculateNaturalReplicas(token, metadata))
             {
-                map.put(ep, range);
+                // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
+                Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
+                map.put(replica.endpoint(), replica);
             }
         }
 
-        return map;
+        return map.asImmutableView();
     }
 
-    public Multimap<Range<Token>, InetAddressAndPort> getRangeAddresses(TokenMetadata metadata)
+    public RangesAtEndpoint getAddressReplicas(TokenMetadata metadata, InetAddressAndPort endpoint)
     {
-        Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create();
+        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
+        for (Token token : metadata.sortedTokens())
+        {
+            Range<Token> range = metadata.getPrimaryRangeFor(token);
+            Replica replica = calculateNaturalReplicas(token, metadata)
+                    .byEndpoint().get(endpoint);
+            if (replica != null)
+            {
+                // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
+                Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
+                builder.add(replica, Conflict.DUPLICATE);
+            }
+        }
+        return builder.build();
+    }
+
+
+    public EndpointsByRange getRangeAddresses(TokenMetadata metadata)
+    {
+        EndpointsByRange.Mutable map = new EndpointsByRange.Mutable();
 
         for (Token token : metadata.sortedTokens())
         {
             Range<Token> range = metadata.getPrimaryRangeFor(token);
-            for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata))
+            for (Replica replica : calculateNaturalReplicas(token, metadata))
             {
-                map.put(range, ep);
+                // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
+                Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
+                map.put(range, replica);
             }
         }
 
-        return map;
+        return map.asImmutableView();
     }
 
-    public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges()
+    public RangesByEndpoint getAddressReplicas()
     {
-        return getAddressRanges(tokenMetadata.cloneOnlyTokenMap());
+        return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap());
     }
 
-    public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress)
+    public RangesAtEndpoint getAddressReplicas(InetAddressAndPort endpoint)
     {
-        return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress);
+        return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap(), endpoint);
     }
 
-    public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress)
+    public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress)
+    {
+        return getPendingAddressRanges(metadata, Collections.singleton(pendingToken), pendingAddress);
+    }
+
+    public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress)
     {
         TokenMetadata temp = metadata.cloneOnlyTokenMap();
         temp.updateNormalTokens(pendingTokens, pendingAddress);
-        return getAddressRanges(temp).get(pendingAddress);
+        return getAddressReplicas(temp, pendingAddress);
     }
 
     public abstract void validateOptions() throws ConfigurationException;
@@ -329,6 +361,10 @@ public abstract class AbstractReplicationStrategy
         AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions);
         strategy.validateExpectedOptions();
         strategy.validateOptions();
+        if (strategy.hasTransientReplicas() && !DatabaseDescriptor.isTransientReplicationEnabled())
+        {
+            throw new ConfigurationException("Transient replication is disabled. Enable in cassandra.yaml to use.");
+        }
     }
 
     public static Class<AbstractReplicationStrategy> getClass(String cls) throws ConfigurationException
@@ -344,21 +380,23 @@ public abstract class AbstractReplicationStrategy
 
     public boolean hasSameSettings(AbstractReplicationStrategy other)
     {
-        return getClass().equals(other.getClass()) && getReplicationFactor() == other.getReplicationFactor();
+        return getClass().equals(other.getClass()) && getReplicationFactor().equals(other.getReplicationFactor());
     }
 
-    protected void validateReplicationFactor(String rf) throws ConfigurationException
+    protected void validateReplicationFactor(String s) throws ConfigurationException
     {
         try
         {
-            if (Integer.parseInt(rf) < 0)
+            ReplicationFactor rf = ReplicationFactor.fromString(s);
+            if (rf.hasTransientReplicas())
             {
-                throw new ConfigurationException("Replication factor must be non-negative; found " + rf);
+                if (DatabaseDescriptor.getNumTokens() > 1)
+                    throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet"));
             }
         }
-        catch (NumberFormatException e2)
+        catch (IllegalArgumentException e)
         {
-            throw new ConfigurationException("Replication factor must be numeric; found " + rf);
+            throw new ConfigurationException(e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 010c892..d35f1fb 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -42,7 +42,6 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
  */
@@ -185,55 +184,38 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         return subsnitch.getDatacenter(endpoint);
     }
 
-    public List<InetAddressAndPort> getSortedListByProximity(final InetAddressAndPort address, Collection<InetAddressAndPort> addresses)
-    {
-        List<InetAddressAndPort> list = new ArrayList<>(addresses);
-        sortByProximity(address, list);
-        return list;
-    }
-
     @Override
-    public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+    public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddresses)
     {
         assert address.equals(FBUtilities.getBroadcastAddressAndPort()); // we only know about ourself
-        if (dynamicBadnessThreshold == 0)
-        {
-            sortByProximityWithScore(address, addresses);
-        }
-        else
-        {
-            sortByProximityWithBadness(address, addresses);
-        }
+        return dynamicBadnessThreshold == 0
+                ? sortedByProximityWithScore(address, unsortedAddresses)
+                : sortedByProximityWithBadness(address, unsortedAddresses);
     }
 
-    private void sortByProximityWithScore(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+    private <C extends ReplicaCollection<? extends C>> C sortedByProximityWithScore(final InetAddressAndPort address, C unsortedAddresses)
     {
         // Scores can change concurrently from a call to this method. But Collections.sort() expects
         // its comparator to be "stable", that is 2 endpoint should compare the same way for the duration
         // of the sort() call. As we copy the scores map on write, it is thus enough to alias the current
         // version of it during this call.
         final HashMap<InetAddressAndPort, Double> scores = this.scores;
-        Collections.sort(addresses, new Comparator<InetAddressAndPort>()
-        {
-            public int compare(InetAddressAndPort a1, InetAddressAndPort a2)
-            {
-                return compareEndpoints(address, a1, a2, scores);
-            }
-        });
+        return unsortedAddresses.sorted((r1, r2) -> compareEndpoints(address, r1, r2, scores));
     }
 
-    private void sortByProximityWithBadness(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+    private <C extends ReplicaCollection<? extends C>> C sortedByProximityWithBadness(final InetAddressAndPort address, C replicas)
     {
-        if (addresses.size() < 2)
-            return;
+        if (replicas.size() < 2)
+            return replicas;
 
-        subsnitch.sortByProximity(address, addresses);
+        // TODO: avoid copy
+        replicas = subsnitch.sortedByProximity(address, replicas);
         HashMap<InetAddressAndPort, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below
                                                            // (which wouldn't really matter here but its cleaner that way).
-        ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(addresses.size());
-        for (InetAddressAndPort inet : addresses)
+        ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(replicas.size());
+        for (Replica replica : replicas)
         {
-            Double score = scores.get(inet);
+            Double score = scores.get(replica.endpoint());
             if (score == null)
                 score = 0.0;
             subsnitchOrderedScores.add(score);
@@ -250,17 +232,18 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         {
             if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold)))
             {
-                sortByProximityWithScore(address, addresses);
-                return;
+                return sortedByProximityWithScore(address, replicas);
             }
         }
+
+        return replicas;
     }
 
     // Compare endpoints given an immutable snapshot of the scores
-    private int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2, Map<InetAddressAndPort, Double> scores)
+    private int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2, Map<InetAddressAndPort, Double> scores)
     {
-        Double scored1 = scores.get(a1);
-        Double scored2 = scores.get(a2);
+        Double scored1 = scores.get(a1.endpoint());
+        Double scored2 = scores.get(a2.endpoint());
         
         if (scored1 == null)
         {
@@ -280,7 +263,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
             return 1;
     }
 
-    public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+    public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
     {
         // That function is fundamentally unsafe because the scores can change at any time and so the result of that
         // method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in
@@ -414,7 +397,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         return getSeverity(FBUtilities.getBroadcastAddressAndPort());
     }
 
-    public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
+    public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2)
     {
         if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2))
             return false;
@@ -434,12 +417,12 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
     }
 
     // Return the max score for the endpoint in the provided list, or -1.0 if no node have a score.
-    private double maxScore(List<InetAddressAndPort> endpoints)
+    private double maxScore(ReplicaCollection<?> endpoints)
     {
         double maxScore = -1.0;
-        for (InetAddressAndPort endpoint : endpoints)
+        for (Replica replica : endpoints)
         {
-            Double score = scores.get(endpoint);
+            Double score = scores.get(replica.endpoint());
             if (score == null)
                 continue;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Ec2Snitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
index b6aafd3..d0474e4 100644
--- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
@@ -68,7 +68,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
     {
         String az = awsApiCall(ZONE_NAME_QUERY_URL);
 
-        // if using the full naming scheme, region name is created by removing letters from the 
+        // if using the full naming scheme, region name is created by removing letters from the
         // end of the availability zone and zone is the full zone name
         usingLegacyNaming = isUsingLegacyNaming(props);
         String region;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Endpoints.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java b/src/java/org/apache/cassandra/locator/Endpoints.java
new file mode 100644
index 0000000..3d5faa4
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/Endpoints.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+import org.apache.cassandra.utils.FBUtilities;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaCollection<E>
+{
+    static final Map<InetAddressAndPort, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>());
+
+    volatile Map<InetAddressAndPort, Replica> byEndpoint;
+
+    Endpoints(List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint)
+    {
+        super(list, isSnapshot);
+        this.byEndpoint = byEndpoint;
+    }
+
+    @Override
+    public Set<InetAddressAndPort> endpoints()
+    {
+        return byEndpoint().keySet();
+    }
+
+    public Map<InetAddressAndPort, Replica> byEndpoint()
+    {
+        Map<InetAddressAndPort, Replica> map = byEndpoint;
+        if (map == null)
+            byEndpoint = map = buildByEndpoint(list);
+        return map;
+    }
+
+    public boolean contains(InetAddressAndPort endpoint, boolean isFull)
+    {
+        Replica replica = byEndpoint().get(endpoint);
+        return replica != null && replica.isFull() == isFull;
+    }
+
+    @Override
+    public boolean contains(Replica replica)
+    {
+        return replica != null
+                && Objects.equals(
+                        byEndpoint().get(replica.endpoint()),
+                        replica);
+    }
+
+    private static Map<InetAddressAndPort, Replica> buildByEndpoint(List<Replica> list)
+    {
+        // TODO: implement a delegating map that uses our superclass' list, and is immutable
+        Map<InetAddressAndPort, Replica> byEndpoint = new LinkedHashMap<>(list.size());
+        for (Replica replica : list)
+        {
+            Replica prev = byEndpoint.put(replica.endpoint(), replica);
+            assert prev == null : "duplicate endpoint in EndpointsForRange: " + prev + " and " + replica;
+        }
+
+        return Collections.unmodifiableMap(byEndpoint);
+    }
+
+    public E withoutSelf()
+    {
+        InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+        return filter(r -> !self.equals(r.endpoint()));
+    }
+
+    public E without(Set<InetAddressAndPort> remove)
+    {
+        return filter(r -> !remove.contains(r.endpoint()));
+    }
+
+    public E keep(Set<InetAddressAndPort> keep)
+    {
+        return filter(r -> keep.contains(r.endpoint()));
+    }
+
+    public E keep(Iterable<InetAddressAndPort> endpoints)
+    {
+        ReplicaCollection.Mutable<E> copy = newMutable(
+                endpoints instanceof Collection<?>
+                        ? ((Collection<InetAddressAndPort>) endpoints).size()
+                        : size()
+        );
+        Map<InetAddressAndPort, Replica> byEndpoint = byEndpoint();
+        for (InetAddressAndPort endpoint : endpoints)
+        {
+            Replica keep = byEndpoint.get(endpoint);
+            if (keep == null)
+                continue;
+            copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE);
+        }
+        return copy.asSnapshot();
+    }
+
+    /**
+     * Care must be taken to ensure no conflicting ranges occur in pending and natural.
+     * Conflicts can occur for two reasons:
+     *   1) due to lack of isolation when reading pending/natural
+     *   2) because a movement that changes the type of replication from transient to full must be handled
+     *      differently for reads and writes (with the reader treating it as transient, and writer as full)
+     *
+     * The method haveConflicts() below, and resolveConflictsInX, are used to detect and resolve any issues
+     */
+    public static <E extends Endpoints<E>> E concat(E natural, E pending)
+    {
+        return AbstractReplicaCollection.concat(natural, pending, Conflict.NONE);
+    }
+
+    public static <E extends Endpoints<E>> boolean haveConflicts(E natural, E pending)
+    {
+        Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
+        for (InetAddressAndPort pendingEndpoint : pending.endpoints())
+        {
+            if (naturalEndpoints.contains(pendingEndpoint))
+                return true;
+        }
+        return false;
+    }
+
+    // must apply first
+    public static <E extends Endpoints<E>> E resolveConflictsInNatural(E natural, E pending)
+    {
+        return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
+    }
+
+    // must apply second
+    public static <E extends Endpoints<E>> E resolveConflictsInPending(E natural, E pending)
+    {
+        return pending.without(natural.endpoints());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsByRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsByRange.java b/src/java/org/apache/cassandra/locator/EndpointsByRange.java
new file mode 100644
index 0000000..cdc8a68
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/EndpointsByRange.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EndpointsByRange extends ReplicaMultimap<Range<Token>, EndpointsForRange>
+{
+    public EndpointsByRange(Map<Range<Token>, EndpointsForRange> map)
+    {
+        super(map);
+    }
+
+    public EndpointsForRange get(Range<Token> range)
+    {
+        Preconditions.checkNotNull(range);
+        return map.getOrDefault(range, EndpointsForRange.empty(range));
+    }
+
+    public static class Mutable extends ReplicaMultimap.Mutable<Range<Token>, EndpointsForRange.Mutable>
+    {
+        @Override
+        protected EndpointsForRange.Mutable newMutable(Range<Token> range)
+        {
+            return new EndpointsForRange.Mutable(range);
+        }
+
+        // TODO: consider all ignoreDuplicates cases
+        public void putAll(Range<Token> range, EndpointsForRange replicas, Conflict ignoreConflicts)
+        {
+            get(range).addAll(replicas, ignoreConflicts);
+        }
+
+        public EndpointsByRange asImmutableView()
+        {
+            return new EndpointsByRange(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsByReplica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsByReplica.java b/src/java/org/apache/cassandra/locator/EndpointsByReplica.java
new file mode 100644
index 0000000..ceea2d1
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/EndpointsByReplica.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EndpointsByReplica extends ReplicaMultimap<Replica, EndpointsForRange>
+{
+    public EndpointsByReplica(Map<Replica, EndpointsForRange> map)
+    {
+        super(map);
+    }
+
+    public EndpointsForRange get(Replica range)
+    {
+        Preconditions.checkNotNull(range);
+        return map.getOrDefault(range, EndpointsForRange.empty(range.range()));
+    }
+
+    public static class Mutable extends ReplicaMultimap.Mutable<Replica, EndpointsForRange.Mutable>
+    {
+        @Override
+        protected EndpointsForRange.Mutable newMutable(Replica replica)
+        {
+            return new EndpointsForRange.Mutable(replica.range());
+        }
+
+        // TODO: consider all ignoreDuplicates cases
+        public void putAll(Replica range, EndpointsForRange replicas, Conflict ignoreConflicts)
+        {
+            map.computeIfAbsent(range, r -> newMutable(r)).addAll(replicas, ignoreConflicts);
+        }
+
+        public EndpointsByReplica asImmutableView()
+        {
+            return new EndpointsByReplica(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsForRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
new file mode 100644
index 0000000..c2d8232
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Iterables.all;
+
+/**
+ * A ReplicaCollection where all Replica are required to cover a range that fully contains the range() defined in the builder().
+ * Endpoints are guaranteed to be unique; on construction, this is enforced unless optionally silenced (in which case
+ * only the first occurrence makes the cut).
+ */
+public class EndpointsForRange extends Endpoints<EndpointsForRange>
+{
+    private final Range<Token> range;
+    private EndpointsForRange(Range<Token> range, List<Replica> list, boolean isSnapshot)
+    {
+        this(range, list, isSnapshot, null);
+    }
+    private EndpointsForRange(Range<Token> range, List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint)
+    {
+        super(list, isSnapshot, byEndpoint);
+        this.range = range;
+        assert range != null;
+    }
+
+    public Range<Token> range()
+    {
+        return range;
+    }
+
+    @Override
+    public Mutable newMutable(int initialCapacity)
+    {
+        return new Mutable(range, initialCapacity);
+    }
+
+    public EndpointsForToken forToken(Token token)
+    {
+        if (!range.contains(token))
+            throw new IllegalArgumentException(token + " is not contained within " + range);
+        return new EndpointsForToken(token, list, isSnapshot, byEndpoint);
+    }
+
+    @Override
+    public EndpointsForRange self()
+    {
+        return this;
+    }
+
+    @Override
+    protected EndpointsForRange snapshot(List<Replica> snapshot)
+    {
+        if (snapshot.isEmpty()) return empty(range);
+        return new EndpointsForRange(range, snapshot, true);
+    }
+
+    public static class Mutable extends EndpointsForRange implements ReplicaCollection.Mutable<EndpointsForRange>
+    {
+        boolean hasSnapshot;
+        public Mutable(Range<Token> range) { this(range, 0); }
+        public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+
+        public void add(Replica replica, Conflict ignoreConflict)
+        {
+            if (hasSnapshot) throw new IllegalStateException();
+            Preconditions.checkNotNull(replica);
+            if (!replica.range().contains(super.range))
+                throw new IllegalArgumentException("Replica " + replica + " does not contain " + super.range);
+
+            Replica prev = super.byEndpoint.put(replica.endpoint(), replica);
+            if (prev != null)
+            {
+                super.byEndpoint.put(replica.endpoint(), prev); // restore prev
+                switch (ignoreConflict)
+                {
+                    case DUPLICATE:
+                        if (prev.equals(replica))
+                            break;
+                    case NONE:
+                        throw new IllegalArgumentException("Conflicting replica added (expected unique endpoints): " + replica + "; existing: " + prev);
+                    case ALL:
+                }
+                return;
+            }
+
+            list.add(replica);
+        }
+
+        @Override
+        public Map<InetAddressAndPort, Replica> byEndpoint()
+        {
+            // our internal map is modifiable, but it is unsafe to modify the map externally
+            // it would be possible to implement a safe modifiable map, but it is probably not valuable
+            return Collections.unmodifiableMap(super.byEndpoint());
+        }
+
+        private EndpointsForRange get(boolean isSnapshot)
+        {
+            return new EndpointsForRange(super.range, super.list, isSnapshot, Collections.unmodifiableMap(super.byEndpoint));
+        }
+
+        public EndpointsForRange asImmutableView()
+        {
+            return get(false);
+        }
+
+        public EndpointsForRange asSnapshot()
+        {
+            hasSnapshot = true;
+            return get(true);
+        }
+    }
+
+    public static class Builder extends ReplicaCollection.Builder<EndpointsForRange, Mutable, EndpointsForRange.Builder>
+    {
+        public Builder(Range<Token> range) { this(range, 0); }
+        public Builder(Range<Token> range, int capacity) { super (new Mutable(range, capacity)); }
+        public boolean containsEndpoint(InetAddressAndPort endpoint)
+        {
+            return mutable.asImmutableView().byEndpoint.containsKey(endpoint);
+        }
+    }
+
+    public static Builder builder(Range<Token> range)
+    {
+        return new Builder(range);
+    }
+    public static Builder builder(Range<Token> range, int capacity)
+    {
+        return new Builder(range, capacity);
+    }
+
+    public static EndpointsForRange empty(Range<Token> range)
+    {
+        return new EndpointsForRange(range, EMPTY_LIST, true, EMPTY_MAP);
+    }
+
+    public static EndpointsForRange of(Replica replica)
+    {
+        // we only use ArrayList or ArrayList.SubList, to ensure callsites are bimorphic
+        ArrayList<Replica> one = new ArrayList<>(1);
+        one.add(replica);
+        // we can safely use singletonMap, as we only otherwise use LinkedHashMap
+        return new EndpointsForRange(replica.range(), one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.endpoint(), replica)));
+    }
+
+    public static EndpointsForRange of(Replica ... replicas)
+    {
+        return copyOf(Arrays.asList(replicas));
+    }
+
+    public static EndpointsForRange copyOf(Collection<Replica> replicas)
+    {
+        if (replicas.isEmpty())
+            throw new IllegalArgumentException("Collection must be non-empty to copy");
+        Range<Token> range = replicas.iterator().next().range();
+        assert all(replicas, r -> range.equals(r.range()));
+        return builder(range, replicas.size()).addAll(replicas).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsForToken.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
new file mode 100644
index 0000000..f24c615
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.dht.Token;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A ReplicaCollection where all Replica are required to cover a range that fully contains the token() defined in the builder().
+ * Endpoints are guaranteed to be unique; on construction, this is enforced unless optionally silenced (in which case
+ * only the first occurrence makes the cut).
+ */
+public class EndpointsForToken extends Endpoints<EndpointsForToken>
+{
+    private final Token token;
+    private EndpointsForToken(Token token, List<Replica> list, boolean isSnapshot)
+    {
+        this(token, list, isSnapshot, null);
+    }
+
+    EndpointsForToken(Token token, List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint)
+    {
+        super(list, isSnapshot, byEndpoint);
+        this.token = token;
+        assert token != null;
+    }
+
+    public Token token()
+    {
+        return token;
+    }
+
+    @Override
+    public Mutable newMutable(int initialCapacity)
+    {
+        return new Mutable(token, initialCapacity);
+    }
+
+    @Override
+    public EndpointsForToken self()
+    {
+        return this;
+    }
+
+    @Override
+    protected EndpointsForToken snapshot(List<Replica> subList)
+    {
+        if (subList.isEmpty()) return empty(token);
+        return new EndpointsForToken(token, subList, true);
+    }
+
+    public static class Mutable extends EndpointsForToken implements ReplicaCollection.Mutable<EndpointsForToken>
+    {
+        boolean hasSnapshot;
+        public Mutable(Token token) { this(token, 0); }
+        public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+
+        public void add(Replica replica, Conflict ignoreConflict)
+        {
+            if (hasSnapshot) throw new IllegalStateException();
+            Preconditions.checkNotNull(replica);
+            if (!replica.range().contains(super.token))
+                throw new IllegalArgumentException("Replica " + replica + " does not contain " + super.token);
+
+            Replica prev = super.byEndpoint.put(replica.endpoint(), replica);
+            if (prev != null)
+            {
+                super.byEndpoint.put(replica.endpoint(), prev); // restore prev
+                switch (ignoreConflict)
+                {
+                    case DUPLICATE:
+                        if (prev.equals(replica))
+                            break;
+                    case NONE:
+                        throw new IllegalArgumentException("Conflicting replica added (expected unique endpoints): " + replica + "; existing: " + prev);
+                    case ALL:
+                }
+                return;
+            }
+
+            list.add(replica);
+        }
+
+        @Override
+        public Map<InetAddressAndPort, Replica> byEndpoint()
+        {
+            // our internal map is modifiable, but it is unsafe to modify the map externally
+            // it would be possible to implement a safe modifiable map, but it is probably not valuable
+            return Collections.unmodifiableMap(super.byEndpoint());
+        }
+
+        private EndpointsForToken get(boolean isSnapshot)
+        {
+            return new EndpointsForToken(super.token, super.list, isSnapshot, Collections.unmodifiableMap(super.byEndpoint));
+        }
+
+        public EndpointsForToken asImmutableView()
+        {
+            return get(false);
+        }
+
+        public EndpointsForToken asSnapshot()
+        {
+            hasSnapshot = true;
+            return get(true);
+        }
+    }
+
+    public static class Builder extends ReplicaCollection.Builder<EndpointsForToken, Mutable, EndpointsForToken.Builder>
+    {
+        public Builder(Token token) { this(token, 0); }
+        public Builder(Token token, int capacity) { super (new Mutable(token, capacity)); }
+    }
+
+    public static Builder builder(Token token)
+    {
+        return new Builder(token);
+    }
+    public static Builder builder(Token token, int capacity)
+    {
+        return new Builder(token, capacity);
+    }
+
+    public static EndpointsForToken empty(Token token)
+    {
+        return new EndpointsForToken(token, EMPTY_LIST, true, EMPTY_MAP);
+    }
+
+    public static EndpointsForToken of(Token token, Replica replica)
+    {
+        // we only use ArrayList or ArrayList.SubList, to ensure callsites are bimorphic
+        ArrayList<Replica> one = new ArrayList<>(1);
+        one.add(replica);
+        // we can safely use singletonMap, as we only otherwise use LinkedHashMap
+        return new EndpointsForToken(token, one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.endpoint(), replica)));
+    }
+
+    public static EndpointsForToken of(Token token, Replica ... replicas)
+    {
+        return copyOf(token, Arrays.asList(replicas));
+    }
+
+    public static EndpointsForToken copyOf(Token token, Collection<Replica> replicas)
+    {
+        if (replicas.isEmpty()) return empty(token);
+        return builder(token, replicas.size()).addAll(replicas).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
index 63d333b..b7797b0 100644
--- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.util.Collection;
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -39,20 +37,20 @@ public interface IEndpointSnitch
      */
     public String getDatacenter(InetAddressAndPort endpoint);
 
-    /**
-     * returns a new <tt>List</tt> sorted by proximity to the given endpoint
-     */
-    public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress);
+    default public String getDatacenter(Replica replica)
+    {
+        return getDatacenter(replica.endpoint());
+    }
 
     /**
-     * This method will sort the <tt>List</tt> by proximity to the given address.
+     * returns a new <tt>List</tt> sorted by proximity to the given endpoint
      */
-    public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses);
+    public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C addresses);
 
     /**
      * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
      */
-    public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2);
+    public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2);
 
     /**
      * called after Gossiper instance exists immediately before it starts gossiping
@@ -63,7 +61,7 @@ public interface IEndpointSnitch
      * Returns whether for a range query doing a query against merged is likely
      * to be faster than 2 sequential queries, one against l1 followed by one against l2.
      */
-    public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2);
+    public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2);
 
     /**
      * Determine if the datacenter or rack values in the current node's snitch conflict with those passed in parameters.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
index 38a1a49..a47c72a 100644
--- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
+++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
@@ -25,6 +25,7 @@ import java.net.UnknownHostException;
 import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FastByteOperations;
 
 /**
@@ -191,9 +192,9 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
         return InetAddressAndPort.getByAddress(InetAddress.getLoopbackAddress());
     }
 
-    public static InetAddressAndPort getLocalHost() throws UnknownHostException
+    public static InetAddressAndPort getLocalHost()
     {
-        return InetAddressAndPort.getByAddress(InetAddress.getLocalHost());
+        return FBUtilities.getLocalAddressAndPort();
     }
 
     public static void initializeDefaultPort(int port)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/LocalStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index a76fe96..41cc9b0 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -17,12 +17,11 @@
  */
 package org.apache.cassandra.locator;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
@@ -30,32 +29,40 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class LocalStrategy extends AbstractReplicationStrategy
 {
+    private static final ReplicationFactor RF = ReplicationFactor.fullOnly(1);
+    private final EndpointsForRange replicas;
+
     public LocalStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
     {
         super(keyspaceName, tokenMetadata, snitch, configOptions);
+        replicas = EndpointsForRange.of(
+                new Replica(FBUtilities.getBroadcastAddressAndPort(),
+                        DatabaseDescriptor.getPartitioner().getMinimumToken(),
+                        DatabaseDescriptor.getPartitioner().getMinimumToken(),
+                        true
+                )
+        );
     }
 
     /**
-     * We need to override this even if we override calculateNaturalEndpoints,
+     * We need to override this even if we override calculateNaturalReplicas,
      * because the default implementation depends on token calculations but
      * LocalStrategy may be used before tokens are set up.
      */
     @Override
-    public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition)
+    public EndpointsForRange getNaturalReplicas(RingPosition searchPosition)
     {
-        ArrayList<InetAddressAndPort> l = new ArrayList<InetAddressAndPort>(1);
-        l.add(FBUtilities.getBroadcastAddressAndPort());
-        return l;
+        return replicas;
     }
 
-    public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+    public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
     {
-        return Collections.singletonList(FBUtilities.getBroadcastAddressAndPort());
+        return replicas;
     }
 
-    public int getReplicationFactor()
+    public ReplicationFactor getReplicationFactor()
     {
-        return 1;
+        return RF;
     }
 
     public void validateOptions() throws ConfigurationException


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org