You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:56 UTC

[04/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
index 9c90d57..4afeb5a 100644
--- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.locator;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -39,9 +38,11 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class OldNetworkTopologyStrategyTest
 {
+
     private List<Token> keyTokens;
     private TokenMetadata tmd;
     private Map<String, ArrayList<InetAddressAndPort>> expectedResults;
@@ -53,7 +54,7 @@ public class OldNetworkTopologyStrategyTest
     }
 
     @Before
-    public void init()
+    public void init() throws Exception
     {
         keyTokens = new ArrayList<Token>();
         tmd = new TokenMetadata();
@@ -160,11 +161,11 @@ public class OldNetworkTopologyStrategyTest
     {
         for (Token keyToken : keyTokens)
         {
-            List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyToken);
-            for (int j = 0; j < endpoints.size(); j++)
+            int j = 0;
+            for (InetAddressAndPort endpoint : strategy.getNaturalReplicasForToken(keyToken).endpoints())
             {
                 ArrayList<InetAddressAndPort> hostsExpected = expectedResults.get(keyToken.toString());
-                assertEquals(endpoints.get(j), hostsExpected.get(j));
+                assertEquals(endpoint, hostsExpected.get(j++));
             }
         }
     }
@@ -188,7 +189,6 @@ public class OldNetworkTopologyStrategyTest
         assertEquals(ranges.left.iterator().next().left, tokensAfterMove[movingNodeIdx]);
         assertEquals(ranges.left.iterator().next().right, tokens[movingNodeIdx]);
         assertEquals("No data should be fetched", ranges.right.size(), 0);
-
     }
 
     @Test
@@ -205,7 +205,6 @@ public class OldNetworkTopologyStrategyTest
         assertEquals("No data should be streamed", ranges.left.size(), 0);
         assertEquals(ranges.right.iterator().next().left, tokens[movingNodeIdx]);
         assertEquals(ranges.right.iterator().next().right, tokensAfterMove[movingNodeIdx]);
-
     }
 
     @SuppressWarnings("unchecked")
@@ -366,16 +365,21 @@ public class OldNetworkTopologyStrategyTest
         TokenMetadata tokenMetadataAfterMove = initTokenMetadata(tokensAfterMove);
         AbstractReplicationStrategy strategy = new OldNetworkTopologyStrategy("Keyspace1", tokenMetadataCurrent, endpointSnitch, optsWithRF(2));
 
-        Collection<Range<Token>> currentRanges = strategy.getAddressRanges().get(movingNode);
-        Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode);
-
-        Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = StorageService.instance.calculateStreamAndFetchRanges(currentRanges, updatedRanges);
+        RangesAtEndpoint currentRanges = strategy.getAddressReplicas().get(movingNode);
+        RangesAtEndpoint updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode);
 
-        return ranges;
+        return asRanges(StorageService.calculateStreamAndFetchRanges(currentRanges, updatedRanges));
     }
 
     private static Map<String, String> optsWithRF(int rf)
     {
         return Collections.singletonMap("replication_factor", Integer.toString(rf));
     }
+
+    public static Pair<Set<Range<Token>>, Set<Range<Token>>> asRanges(Pair<RangesAtEndpoint, RangesAtEndpoint> replicas)
+    {
+        Set<Range<Token>> leftRanges = replicas.left.ranges();
+        Set<Range<Token>> rightRanges = replicas.right.ranges();
+        return Pair.create(leftRanges, rightRanges);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
index 56fd181..8e0bc00 100644
--- a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
+++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.dht.Token;
 import org.junit.Test;
 
 import java.net.UnknownHostException;
-import java.util.Collection;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -38,17 +37,29 @@ public class PendingRangeMapsTest {
         return new Range<Token>(new BigIntegerToken(left), new BigIntegerToken(right));
     }
 
+    private static void addPendingRange(PendingRangeMaps pendingRangeMaps, Range<Token> range, String endpoint)
+    {
+        try
+        {
+            pendingRangeMaps.addPendingRange(range, Replica.fullReplica(InetAddressAndPort.getByName(endpoint), range));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
     @Test
     public void testPendingEndpoints() throws UnknownHostException
     {
         PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
 
-        pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddressAndPort.getByName("127.0.0.1"));
-        pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddressAndPort.getByName("127.0.0.2"));
-        pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddressAndPort.getByName("127.0.0.3"));
-        pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddressAndPort.getByName("127.0.0.4"));
-        pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddressAndPort.getByName("127.0.0.5"));
-        pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddressAndPort.getByName("127.0.0.6"));
+        addPendingRange(pendingRangeMaps, genRange("5", "15"), "127.0.0.1");
+        addPendingRange(pendingRangeMaps, genRange("15", "25"), "127.0.0.2");
+        addPendingRange(pendingRangeMaps, genRange("25", "35"), "127.0.0.3");
+        addPendingRange(pendingRangeMaps, genRange("35", "45"), "127.0.0.4");
+        addPendingRange(pendingRangeMaps, genRange("45", "55"), "127.0.0.5");
+        addPendingRange(pendingRangeMaps, genRange("45", "65"), "127.0.0.6");
 
         assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
         assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
@@ -61,8 +72,8 @@ public class PendingRangeMapsTest {
         assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
         assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
 
-        Collection<InetAddressAndPort> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
-        assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.1")));
+        EndpointsForToken replicas = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
+        assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.1")));
     }
 
     @Test
@@ -70,13 +81,13 @@ public class PendingRangeMapsTest {
     {
         PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
 
-        pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddressAndPort.getByName("127.0.0.1"));
-        pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddressAndPort.getByName("127.0.0.2"));
-        pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddressAndPort.getByName("127.0.0.3"));
-        pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddressAndPort.getByName("127.0.0.4"));
-        pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddressAndPort.getByName("127.0.0.5"));
-        pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddressAndPort.getByName("127.0.0.6"));
-        pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddressAndPort.getByName("127.0.0.7"));
+        addPendingRange(pendingRangeMaps, genRange("5", "15"), "127.0.0.1");
+        addPendingRange(pendingRangeMaps, genRange("15", "25"), "127.0.0.2");
+        addPendingRange(pendingRangeMaps, genRange("25", "35"), "127.0.0.3");
+        addPendingRange(pendingRangeMaps, genRange("35", "45"), "127.0.0.4");
+        addPendingRange(pendingRangeMaps, genRange("45", "55"), "127.0.0.5");
+        addPendingRange(pendingRangeMaps, genRange("45", "65"), "127.0.0.6");
+        addPendingRange(pendingRangeMaps, genRange("65", "7"), "127.0.0.7");
 
         assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
         assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
@@ -90,8 +101,8 @@ public class PendingRangeMapsTest {
         assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
         assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
 
-        Collection<InetAddressAndPort> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
-        assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.1")));
-        assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.7")));
+        EndpointsForToken replicas = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
+        assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.1")));
+        assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.7")));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
new file mode 100644
index 0000000..66eff23
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+
+public class ReplicaCollectionTest
+{
+
+    static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, NULL_EP;
+    static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE;
+
+    static
+    {
+        try
+        {
+            EP1 = InetAddressAndPort.getByName("127.0.0.1");
+            EP2 = InetAddressAndPort.getByName("127.0.0.2");
+            EP3 = InetAddressAndPort.getByName("127.0.0.3");
+            EP4 = InetAddressAndPort.getByName("127.0.0.4");
+            EP5 = InetAddressAndPort.getByName("127.0.0.5");
+            BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort();
+            NULL_EP = InetAddressAndPort.getByName("127.255.255.255");
+            R1 = range(0, 1);
+            R2 = range(1, 2);
+            R3 = range(2, 3);
+            R4 = range(3, 4);
+            R5 = range(4, 5);
+            BROADCAST_RANGE = range(10, 11);
+            NULL_RANGE = range(10000, 10001);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static Token tk(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    static Range<Token> range(long left, long right)
+    {
+        return new Range<>(tk(left), tk(right));
+    }
+
+    static class TestCase<C extends AbstractReplicaCollection<C>>
+    {
+        final C test;
+        final List<Replica> canonicalList;
+        final Multimap<InetAddressAndPort, Replica> canonicalByEndpoint;
+        final Multimap<Range<Token>, Replica> canonicalByRange;
+
+        TestCase(C test, List<Replica> canonicalList)
+        {
+            this.test = test;
+            this.canonicalList = canonicalList;
+            this.canonicalByEndpoint = HashMultimap.create();
+            this.canonicalByRange = HashMultimap.create();
+            for (Replica replica : canonicalList)
+                canonicalByEndpoint.put(replica.endpoint(), replica);
+            for (Replica replica : canonicalList)
+                canonicalByRange.put(replica.range(), replica);
+        }
+
+        void testSize()
+        {
+            Assert.assertEquals(canonicalList.size(), test.size());
+        }
+
+        void testEquals()
+        {
+            Assert.assertTrue(Iterables.elementsEqual(canonicalList, test));
+        }
+
+        void testEndpoints()
+        {
+            // TODO: we should do more exhaustive tests of the collection
+            Assert.assertEquals(ImmutableSet.copyOf(canonicalByEndpoint.keySet()), ImmutableSet.copyOf(test.endpoints()));
+            try
+            {
+                test.endpoints().add(EP5);
+                Assert.fail();
+            } catch (UnsupportedOperationException e) {}
+            try
+            {
+                test.endpoints().remove(EP5);
+                Assert.fail();
+            } catch (UnsupportedOperationException e) {}
+
+            Assert.assertTrue(test.endpoints().containsAll(canonicalByEndpoint.keySet()));
+            for (InetAddressAndPort ep : canonicalByEndpoint.keySet())
+                Assert.assertTrue(test.endpoints().contains(ep));
+            for (InetAddressAndPort ep : ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP))
+                if (!canonicalByEndpoint.containsKey(ep))
+                    Assert.assertFalse(test.endpoints().contains(ep));
+        }
+
+        public void testOrderOfIteration()
+        {
+            Assert.assertEquals(canonicalList, ImmutableList.copyOf(test));
+            Assert.assertEquals(canonicalList, test.stream().collect(Collectors.toList()));
+            Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints());
+        }
+
+        void testSelect(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        {
+            TestCase<C> allMatchZeroCapacity = new TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), Collections.emptyList());
+            allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, selectDepth - 1);
+
+            TestCase<C> noMatchFullCapacity = new TestCase<>(test.select().add(Predicates.alwaysFalse(), canonicalList.size()).get(), Collections.emptyList());
+            noMatchFullCapacity.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
+
+            if (canonicalList.size() <= 2)
+                return;
+
+            List<Replica> newOrderList = ImmutableList.of(canonicalList.get(2), canonicalList.get(1), canonicalList.get(0));
+            TestCase<C> newOrder = new TestCase<>(
+                    test.select()
+                            .add(r -> r == newOrderList.get(0), 3)
+                            .add(r -> r == newOrderList.get(1), 3)
+                            .add(r -> r == newOrderList.get(2), 3)
+                            .get(), newOrderList
+            );
+            newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
+        }
+
+        private void assertSubList(C subCollection, int from, int to)
+        {
+            Assert.assertTrue(subCollection.isSnapshot);
+            if (from == to)
+            {
+                Assert.assertTrue(subCollection.isEmpty());
+            }
+            else
+            {
+                List<Replica> subList = this.test.list.subList(from, to);
+                if (test.isSnapshot)
+                    Assert.assertSame(subList.getClass(), subCollection.list.getClass());
+                Assert.assertEquals(subList, subCollection.list);
+            }
+        }
+
+        void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        {
+            if (test.isSnapshot)
+                Assert.assertSame(test, test.subList(0, test.size()));
+
+            if (test.isEmpty())
+                return;
+
+            TestCase<C> skipFront = new TestCase<>(test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size()));
+            assertSubList(skipFront.test, 1, canonicalList.size());
+            skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+            TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1));
+            assertSubList(skipBack.test, 0, canonicalList.size() - 1);
+            skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+        }
+
+        void testFilter(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        {
+            if (test.isSnapshot)
+                Assert.assertSame(test, test.filter(Predicates.alwaysTrue()));
+
+            if (test.isEmpty())
+                return;
+            // remove start
+            // we recurse on the same subset in testSubList, so just corroborate we have the correct list here
+            assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, canonicalList.size());
+
+            if (test.size() <= 1)
+                return;
+            // remove end
+            // we recurse on the same subset in testSubList, so just corroborate we have the correct list here
+            assertSubList(test.filter(r -> r != canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1);
+
+            if (test.size() <= 2)
+                return;
+            Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2);
+            TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test)));
+            filtered.testAll(subListDepth, filterDepth - 1, sortDepth, selectDepth);
+        }
+
+        void testContains()
+        {
+            for (Replica replica : canonicalList)
+                Assert.assertTrue(test.contains(replica));
+            Assert.assertFalse(test.contains(fullReplica(NULL_EP, NULL_RANGE)));
+        }
+
+        void testGet()
+        {
+            for (int i = 0 ; i < canonicalList.size() ; ++i)
+                Assert.assertEquals(canonicalList.get(i), test.get(i));
+        }
+
+        void testSort(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        {
+            final Comparator<Replica> comparator = (o1, o2) ->
+            {
+                boolean f1 = o1 == canonicalList.get(0);
+                boolean f2 = o2 == canonicalList.get(0);
+                return f1 == f2 ? 0 : f1 ? 1 : -1;
+            };
+            TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList));
+            sorted.testAll(subListDepth, filterDepth, sortDepth - 1, selectDepth);
+        }
+
+        private void testAll(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        {
+            testEndpoints();
+            testOrderOfIteration();
+            testContains();
+            testGet();
+            testEquals();
+            testSize();
+            if (subListDepth > 0)
+                testSubList(subListDepth, filterDepth, sortDepth, selectDepth);
+            if (filterDepth > 0)
+                testFilter(subListDepth, filterDepth, sortDepth, selectDepth);
+            if (sortDepth > 0)
+                testSort(subListDepth, filterDepth, sortDepth, selectDepth);
+            if (selectDepth > 0)
+                testSelect(subListDepth, filterDepth, sortDepth, selectDepth);
+        }
+
+        public void testAll()
+        {
+            testAll(2, 2, 2, 2);
+        }
+    }
+
+    static class RangesAtEndpointTestCase extends TestCase<RangesAtEndpoint>
+    {
+        RangesAtEndpointTestCase(RangesAtEndpoint test, List<Replica> canonicalList)
+        {
+            super(test, canonicalList);
+        }
+
+        void testRanges()
+        {
+            Assert.assertEquals(ImmutableSet.copyOf(canonicalByRange.keySet()), ImmutableSet.copyOf(test.ranges()));
+            try
+            {
+                test.ranges().add(R5);
+                Assert.fail();
+            } catch (UnsupportedOperationException e) {}
+            try
+            {
+                test.ranges().remove(R5);
+                Assert.fail();
+            } catch (UnsupportedOperationException e) {}
+
+            Assert.assertTrue(test.ranges().containsAll(canonicalByRange.keySet()));
+            for (Range<Token> range : canonicalByRange.keySet())
+                Assert.assertTrue(test.ranges().contains(range));
+            for (Range<Token> range : ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE))
+                if (!canonicalByRange.containsKey(range))
+                    Assert.assertFalse(test.ranges().contains(range));
+        }
+
+        @Override
+        public void testOrderOfIteration()
+        {
+            super.testOrderOfIteration();
+            Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::range)), test.ranges());
+        }
+
+        @Override
+        public void testAll()
+        {
+            super.testAll();
+            testRanges();
+        }
+    }
+
+    private static final ImmutableList<Replica> RANGES_AT_ENDPOINT = ImmutableList.of(
+            fullReplica(EP1, R1),
+            fullReplica(EP1, R2),
+            transientReplica(EP1, R3),
+            fullReplica(EP1, R4),
+            transientReplica(EP1, R5)
+    );
+
+    @Test
+    public void testRangesAtEndpoint()
+    {
+        ImmutableList<Replica> canonical = RANGES_AT_ENDPOINT;
+        new RangesAtEndpointTestCase(
+                RangesAtEndpoint.copyOf(canonical), canonical
+        ).testAll();
+    }
+
+    @Test
+    public void testMutableRangesAtEndpoint()
+    {
+        ImmutableList<Replica> canonical1 = RANGES_AT_ENDPOINT.subList(0, RANGES_AT_ENDPOINT.size());
+        RangesAtEndpoint.Mutable test = new RangesAtEndpoint.Mutable(RANGES_AT_ENDPOINT.get(0).endpoint(), canonical1.size());
+        test.addAll(canonical1, Conflict.NONE);
+        try
+        {   // incorrect range
+            test.addAll(canonical1, Conflict.NONE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates
+        try
+        {   // invalid endpoint; always error
+            test.add(fullReplica(EP2, BROADCAST_RANGE), Conflict.ALL);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        try
+        {   // conflict on isFull/isTransient
+            test.add(fullReplica(EP1, R3), Conflict.DUPLICATE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.add(fullReplica(EP1, R3), Conflict.ALL);
+
+        new RangesAtEndpointTestCase(test, canonical1).testAll();
+
+        RangesAtEndpoint view = test.asImmutableView();
+        RangesAtEndpoint snapshot = view.subList(0, view.size());
+
+        ImmutableList<Replica> canonical2 = RANGES_AT_ENDPOINT;
+        test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+        new TestCase<>(snapshot, canonical1).testAll();
+        new TestCase<>(view, canonical2).testAll();
+        new TestCase<>(test, canonical2).testAll();
+    }
+
+    private static final ImmutableList<Replica> ENDPOINTS_FOR_X = ImmutableList.of(
+            fullReplica(EP1, R1),
+            fullReplica(EP2, R1),
+            transientReplica(EP3, R1),
+            fullReplica(EP4, R1),
+            transientReplica(EP5, R1)
+    );
+
+    @Test
+    public void testEndpointsForRange()
+    {
+        ImmutableList<Replica> canonical = ENDPOINTS_FOR_X;
+        new TestCase<>(
+                EndpointsForRange.copyOf(canonical), canonical
+        ).testAll();
+    }
+
+    @Test
+    public void testMutableEndpointsForRange()
+    {
+        ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, ENDPOINTS_FOR_X.size() - 1);
+        EndpointsForRange.Mutable test = new EndpointsForRange.Mutable(R1, canonical1.size());
+        test.addAll(canonical1, Conflict.NONE);
+        try
+        {   // incorrect range
+            test.addAll(canonical1, Conflict.NONE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates
+        try
+        {   // incorrect range
+            test.add(fullReplica(BROADCAST_EP, R2), Conflict.ALL);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        try
+        {   // conflict on isFull/isTransient
+            test.add(transientReplica(EP1, R1), Conflict.DUPLICATE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.add(transientReplica(EP1, R1), Conflict.ALL);
+
+        new TestCase<>(test, canonical1).testAll();
+
+        EndpointsForRange view = test.asImmutableView();
+        EndpointsForRange snapshot = view.subList(0, view.size());
+
+        ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X;
+        test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+        new TestCase<>(snapshot, canonical1).testAll();
+        new TestCase<>(view, canonical2).testAll();
+        new TestCase<>(test, canonical2).testAll();
+    }
+
+    @Test
+    public void testEndpointsForToken()
+    {
+        ImmutableList<Replica> canonical = ENDPOINTS_FOR_X;
+        new TestCase<>(
+                EndpointsForToken.copyOf(tk(1), canonical), canonical
+        ).testAll();
+    }
+
+    @Test
+    public void testMutableEndpointsForToken()
+    {
+        ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, ENDPOINTS_FOR_X.size() - 1);
+        EndpointsForToken.Mutable test = new EndpointsForToken.Mutable(tk(1), canonical1.size());
+        test.addAll(canonical1, Conflict.NONE);
+        try
+        {   // incorrect range
+            test.addAll(canonical1, Conflict.NONE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates
+        try
+        {   // incorrect range
+            test.add(fullReplica(BROADCAST_EP, R2), Conflict.ALL);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        try
+        {   // conflict on isFull/isTransient
+            test.add(transientReplica(EP1, R1), Conflict.DUPLICATE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.add(transientReplica(EP1, R1), Conflict.ALL);
+
+        new TestCase<>(test, canonical1).testAll();
+
+        EndpointsForToken view = test.asImmutableView();
+        EndpointsForToken snapshot = view.subList(0, view.size());
+
+        ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X;
+        test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+        new TestCase<>(snapshot, canonical1).testAll();
+        new TestCase<>(view, canonical2).testAll();
+        new TestCase<>(test, canonical2).testAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
new file mode 100644
index 0000000..66f538f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+
+public class ReplicaUtils
+{
+    public static final Range<Token> FULL_RANGE = new Range<>(Murmur3Partitioner.MINIMUM, Murmur3Partitioner.MINIMUM);
+    public static final AbstractBounds<PartitionPosition> FULL_BOUNDS = new Range<>(Murmur3Partitioner.MINIMUM.minKeyBound(), Murmur3Partitioner.MINIMUM.maxKeyBound());
+
+    public static Replica full(InetAddressAndPort endpoint)
+    {
+        return fullReplica(endpoint, FULL_RANGE);
+    }
+
+    public static Replica trans(InetAddressAndPort endpoint)
+    {
+        return transientReplica(endpoint, FULL_RANGE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
new file mode 100644
index 0000000..a0427db
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
+
+public class ReplicationFactorTest
+{
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+        Gossiper.instance.start(1);
+    }
+
+    private static void assertRfParseFailure(String s)
+    {
+        try
+        {
+            ReplicationFactor.fromString(s);
+            Assert.fail("Expected IllegalArgumentException");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // expected
+        }
+    }
+
+    private static void assertRfParse(String s, int expectedReplicas, int expectedTrans)
+    {
+        ReplicationFactor rf = ReplicationFactor.fromString(s);
+        Assert.assertEquals(expectedReplicas, rf.allReplicas);
+        Assert.assertEquals(expectedTrans, rf.transientReplicas());
+        Assert.assertEquals(expectedReplicas - expectedTrans, rf.fullReplicas);
+    }
+
+    @Test
+    public void parseTest()
+    {
+        assertRfParse("3", 3, 0);
+        assertRfParse("3/1", 3, 1);
+
+        assertRfParse("5", 5, 0);
+        assertRfParse("5/2", 5, 2);
+
+        assertRfParseFailure("-1");
+        assertRfParseFailure("3/3");
+        assertRfParseFailure("3/4");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index a8caa72..e6a9365 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -17,9 +17,7 @@
  */
 package org.apache.cassandra.locator;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
@@ -75,7 +73,7 @@ public class ReplicationStrategyEndpointCacheTest
     public void runEndpointsWereCachedTest(Class stratClass, Map<String, String> configOptions) throws Exception
     {
         setup(stratClass, configOptions);
-        assert strategy.getNaturalEndpoints(searchToken).equals(strategy.getNaturalEndpoints(searchToken));
+        assert strategy.getNaturalReplicasForToken(searchToken).equals(strategy.getNaturalReplicasForToken(searchToken));
     }
 
     @Test
@@ -89,34 +87,34 @@ public class ReplicationStrategyEndpointCacheTest
     public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, String> configOptions) throws Exception
     {
         setup(stratClass, configOptions);
-        ArrayList<InetAddressAndPort> initial;
-        ArrayList<InetAddressAndPort> endpoints;
+        EndpointsForToken initial;
+        EndpointsForToken replicas;
 
-        endpoints = strategy.getNaturalEndpoints(searchToken);
-        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+        replicas = strategy.getNaturalReplicasForToken(searchToken);
+        assert replicas.size() == 5 : StringUtils.join(replicas, ",");
 
         // test token addition, in DC2 before existing token
-        initial = strategy.getNaturalEndpoints(searchToken);
+        initial = strategy.getNaturalReplicasForToken(searchToken);
         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddressAndPort.getByName("127.0.0.5"));
-        endpoints = strategy.getNaturalEndpoints(searchToken);
-        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
-        assert !endpoints.equals(initial);
+        replicas = strategy.getNaturalReplicasForToken(searchToken);
+        assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+        assert !replicas.equals(initial);
 
         // test token removal, newly created token
-        initial = strategy.getNaturalEndpoints(searchToken);
+        initial = strategy.getNaturalReplicasForToken(searchToken);
         tmd.removeEndpoint(InetAddressAndPort.getByName("127.0.0.5"));
-        endpoints = strategy.getNaturalEndpoints(searchToken);
-        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
-        assert !endpoints.contains(InetAddressAndPort.getByName("127.0.0.5"));
-        assert !endpoints.equals(initial);
+        replicas = strategy.getNaturalReplicasForToken(searchToken);
+        assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+        assert !replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.5"));
+        assert !replicas.equals(initial);
 
         // test token change
-        initial = strategy.getNaturalEndpoints(searchToken);
+        initial = strategy.getNaturalReplicasForToken(searchToken);
         //move .8 after search token but before other DC3
         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddressAndPort.getByName("127.0.0.8"));
-        endpoints = strategy.getNaturalEndpoints(searchToken);
-        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
-        assert !endpoints.equals(initial);
+        replicas = strategy.getNaturalReplicasForToken(searchToken);
+        assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+        assert !replicas.equals(initial);
     }
 
     protected static class FakeSimpleStrategy extends SimpleStrategy
@@ -128,11 +126,11 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+        public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
         {
-            assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+            assert !called : "calculateNaturalReplicas was already called, result should have been cached";
             called = true;
-            return super.calculateNaturalEndpoints(token, metadata);
+            return super.calculateNaturalReplicas(token, metadata);
         }
     }
 
@@ -145,11 +143,11 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+        public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
         {
-            assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+            assert !called : "calculateNaturalReplicas was already called, result should have been cached";
             called = true;
-            return super.calculateNaturalEndpoints(token, metadata);
+            return super.calculateNaturalReplicas(token, metadata);
         }
     }
 
@@ -162,11 +160,11 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+        public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
         {
-            assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+            assert !called : "calculateNaturalReplicas was already called, result should have been cached";
             called = true;
-            return super.calculateNaturalEndpoints(token, metadata);
+            return super.calculateNaturalReplicas(token, metadata);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index fe77b0e..1e0c152 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -19,14 +19,22 @@ package org.apache.cassandra.locator;
 
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.IPartitioner;
@@ -53,6 +61,7 @@ public class SimpleStrategyTest
     {
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1));
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
     }
 
     @Test
@@ -107,12 +116,12 @@ public class SimpleStrategyTest
 
             for (int i = 0; i < keyTokens.length; i++)
             {
-                List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyTokens[i]);
-                assertEquals(strategy.getReplicationFactor(), endpoints.size());
+                EndpointsForToken replicas = strategy.getNaturalReplicasForToken(keyTokens[i]);
+                assertEquals(strategy.getReplicationFactor().allReplicas, replicas.size());
                 List<InetAddressAndPort> correctEndpoints = new ArrayList<>();
-                for (int j = 0; j < endpoints.size(); j++)
+                for (int j = 0; j < replicas.size(); j++)
                     correctEndpoints.add(hosts.get((i + j + 1) % hosts.size()));
-                assertEquals(new HashSet<>(correctEndpoints), new HashSet<>(endpoints));
+                assertEquals(new HashSet<>(correctEndpoints), replicas.endpoints());
             }
         }
     }
@@ -154,30 +163,80 @@ public class SimpleStrategyTest
 
             PendingRangeCalculatorService.calculatePendingRanges(strategy, keyspaceName);
 
-            int replicationFactor = strategy.getReplicationFactor();
+            int replicationFactor = strategy.getReplicationFactor().allReplicas;
 
             for (int i = 0; i < keyTokens.length; i++)
             {
-                Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i]));
-                assertTrue(endpoints.size() >= replicationFactor);
+                EndpointsForToken replicas = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalReplicasForToken(keyTokens[i]));
+                assertTrue(replicas.size() >= replicationFactor);
 
                 for (int j = 0; j < replicationFactor; j++)
                 {
                     //Check that the old nodes are definitely included
-                    assertTrue(endpoints.contains(hosts.get((i + j + 1) % hosts.size())));
+                   assertTrue(replicas.endpoints().contains(hosts.get((i + j + 1) % hosts.size())));
                 }
 
                 // bootstrapEndpoint should be in the endpoints for i in MAX-RF to MAX, but not in any earlier ep.
                 if (i < RING_SIZE - replicationFactor)
-                    assertFalse(endpoints.contains(bootstrapEndpoint));
+                    assertFalse(replicas.endpoints().contains(bootstrapEndpoint));
                 else
-                    assertTrue(endpoints.contains(bootstrapEndpoint));
+                    assertTrue(replicas.endpoints().contains(bootstrapEndpoint));
             }
         }
 
         StorageServiceAccessor.setTokenMetadata(oldTmd);
     }
 
+    private static Token tk(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    private static Range<Token> range(long l, long r)
+    {
+        return new Range<>(tk(l), tk(r));
+    }
+
+    @Test
+    public void transientReplica() throws Exception
+    {
+        IEndpointSnitch snitch = new SimpleSnitch();
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+
+        List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"),
+                                                                InetAddressAndPort.getByName("127.0.0.2"),
+                                                                InetAddressAndPort.getByName("127.0.0.3"),
+                                                                InetAddressAndPort.getByName("127.0.0.4"));
+
+        Multimap<InetAddressAndPort, Token> tokens = HashMultimap.create();
+        tokens.put(endpoints.get(0), tk(100));
+        tokens.put(endpoints.get(1), tk(200));
+        tokens.put(endpoints.get(2), tk(300));
+        tokens.put(endpoints.get(3), tk(400));
+        TokenMetadata metadata = new TokenMetadata();
+        metadata.updateNormalTokens(tokens);
+
+        Map<String, String> configOptions = new HashMap<String, String>();
+        configOptions.put("replication_factor", "3/1");
+
+        SimpleStrategy strategy = new SimpleStrategy("ks", metadata, snitch, configOptions);
+
+        Range<Token> range1 = range(400, 100);
+        Assert.assertEquals(EndpointsForToken.of(range1.right,
+                                                 Replica.fullReplica(endpoints.get(0), range1),
+                                                 Replica.fullReplica(endpoints.get(1), range1),
+                                                 Replica.transientReplica(endpoints.get(2), range1)),
+                            strategy.getNaturalReplicasForToken(tk(99)));
+
+
+        Range<Token> range2 = range(100, 200);
+        Assert.assertEquals(EndpointsForToken.of(range2.right,
+                                                 Replica.fullReplica(endpoints.get(1), range2),
+                                                 Replica.fullReplica(endpoints.get(2), range2),
+                                                 Replica.transientReplica(endpoints.get(3), range2)),
+                            strategy.getNaturalReplicasForToken(tk(101)));
+    }
+
     private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd)
     {
         KeyspaceMetadata ksmd = Schema.instance.getKeyspaceMetadata(keyspaceName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index b589d2d..ae8c011 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -118,7 +118,7 @@ public class TokenMetadataTest
             }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
             {
                 return 0;
             }
@@ -165,7 +165,7 @@ public class TokenMetadataTest
             }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
             {
                 return 0;
             }
@@ -216,7 +216,7 @@ public class TokenMetadataTest
             }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
             {
                 return 0;
             }
@@ -263,7 +263,7 @@ public class TokenMetadataTest
             }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
             {
                 return 0;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index d97cdb8..e226d32 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -25,19 +25,20 @@ import org.junit.Test;
 
 import org.junit.Assert;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+
 public class WriteCallbackInfoTest
 {
     @BeforeClass
@@ -65,7 +66,7 @@ public class WriteCallbackInfoTest
                          ? new Commit(UUID.randomUUID(), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build())
                          : new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
 
-        WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddressAndPort.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
+        WriteCallbackInfo wcbi = new WriteCallbackInfo(full(InetAddressAndPort.getByName("192.168.1.1")), null, new MessageOut(verb, payload, null), null, cl, allowHints);
         Assert.assertEquals(expectHint, wcbi.shouldHint());
         if (expectHint)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
index 6a8dc83..379031c 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractEndpointSnitch;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceTest;
@@ -172,7 +173,7 @@ public class OutboundMessagingConnectionTest
             return nodeToDc.get(endpoint);
         }
 
-        public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+        public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
         {
             return 0;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
deleted file mode 100644
index 802a673..0000000
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import com.google.common.collect.Lists;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.MerkleTrees;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LocalSyncTaskTest extends AbstractRepairTest
-{
-    private static final IPartitioner partitioner = Murmur3Partitioner.instance;
-    public static final String KEYSPACE1 = "DifferencerTest";
-    public static final String CF_STANDARD = "Standard1";
-    public static ColumnFamilyStore cfs;
-
-    @BeforeClass
-    public static void defineSchema()
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
-
-        TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id;
-        cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
-    }
-
-    /**
-     * When there is no difference between two, LocalSyncTask should return stats with 0 difference.
-     */
-    @Test
-    public void testNoDifference() throws Throwable
-    {
-        final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1");
-        final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1");
-
-        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
-        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
-
-        MerkleTrees tree1 = createInitialTree(desc);
-
-        MerkleTrees tree2 = createInitialTree(desc);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine here
-        TreeResponse r1 = new TreeResponse(ep1, tree1);
-        TreeResponse r2 = new TreeResponse(ep2, tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        task.run();
-
-        assertEquals(0, task.get().numberOfDifferences);
-    }
-
-    @Test
-    public void testDifference() throws Throwable
-    {
-        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
-        UUID parentRepairSession = UUID.randomUUID();
-        Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
-                                                                 Arrays.asList(cfs), Arrays.asList(range), false,
-                                                                 ActiveRepairService.UNREPAIRED_SSTABLE, false,
-                                                                 PreviewKind.NONE);
-
-        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
-
-        MerkleTrees tree1 = createInitialTree(desc);
-
-        MerkleTrees tree2 = createInitialTree(desc);
-
-        // change a range in one of the trees
-        Token token = partitioner.midpoint(range.left, range.right);
-        tree1.invalidate(token);
-        MerkleTree.TreeRange changed = tree1.get(token);
-        changed.hash("non-empty hash!".getBytes());
-
-        Set<Range<Token>> interesting = new HashSet<>();
-        interesting.add(changed);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine here
-        TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
-        TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        task.run();
-
-        // ensure that the changed range was recorded
-        assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences);
-    }
-
-    @Test
-    public void fullRepairStreamPlan() throws Exception
-    {
-        UUID sessionID = registerSession(cfs, true, true);
-        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
-        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
-        TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
-
-        assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
-        assertTrue(plan.getFlushBeforeTransfer());
-    }
-
-    @Test
-    public void incrementalRepairStreamPlan() throws Exception
-    {
-        UUID sessionID = registerSession(cfs, true, true);
-        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
-        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
-        TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
-
-        assertEquals(desc.parentSessionId, plan.getPendingRepair());
-        assertFalse(plan.getFlushBeforeTransfer());
-    }
-
-    private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner)
-    {
-        MerkleTrees tree = new MerkleTrees(partitioner);
-        tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
-        tree.init();
-        for (MerkleTree.TreeRange r : tree.invalids())
-        {
-            r.ensureHashInitialised();
-        }
-        return tree;
-    }
-
-    private MerkleTrees createInitialTree(RepairJobDesc desc)
-    {
-        return createInitialTree(desc, partitioner);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
index 2044106..418d7de 100644
--- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -29,7 +28,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.repair.RepairRunnable.CommonRange;
 
 import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
 
@@ -41,7 +39,7 @@ public class RepairRunnableTest extends AbstractRepairTest
     @Test
     public void filterCommonIncrementalRangesNotForced() throws Exception
     {
-        CommonRange cr = new CommonRange(PARTICIPANTS, ALL_RANGES);
+        CommonRange cr = new CommonRange(PARTICIPANTS, Collections.emptySet(), ALL_RANGES);
 
         List<CommonRange> expected = Lists.newArrayList(cr);
         List<CommonRange> actual = filterCommonRanges(expected, Collections.emptySet(), false);
@@ -52,13 +50,13 @@ public class RepairRunnableTest extends AbstractRepairTest
     @Test
     public void forceFilterCommonIncrementalRanges() throws Exception
     {
-        CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2));
-        CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3));
+        CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2));
+        CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3));
         Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
 
         List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
-        List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)),
-                                                        new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3)));
+        List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2)),
+                                                        new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3)));
         List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, true);
 
         Assert.assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 54f0511..e77d657 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -62,9 +63,10 @@ public class RepairSessionTest
         IPartitioner p = Murmur3Partitioner.instance;
         Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
         Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote);
-        RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange),
+        RepairSession session = new RepairSession(parentSessionId, sessionId,
+                                                  new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)),
                                                   "Keyspace1", RepairParallelism.SEQUENTIAL,
-                                                  endpoints, false, false, false,
+                                                  false, false, false,
                                                   PreviewKind.NONE, false, "Standard1");
 
         // perform convict

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
new file mode 100644
index 0000000..92ae172
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SymmetricLocalSyncTaskTest extends AbstractRepairTest
+{
+    private static final IPartitioner partitioner = Murmur3Partitioner.instance;
+    public static final String KEYSPACE1 = "DifferencerTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
+
+        TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id;
+        cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+    }
+
+    /**
+     * When there is no difference between two, SymmetricLocalSyncTask should return stats with 0 difference.
+     */
+    @Test
+    public void testNoDifference() throws Throwable
+    {
+        final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1");
+        final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1");
+
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+        MerkleTrees tree1 = createInitialTree(desc);
+
+        MerkleTrees tree2 = createInitialTree(desc);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine here
+        TreeResponse r1 = new TreeResponse(ep1, tree1);
+        TreeResponse r2 = new TreeResponse(ep2, tree2);
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+        task.run();
+
+        assertEquals(0, task.get().numberOfDifferences);
+    }
+
+    @Test
+    public void testDifference() throws Throwable
+    {
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+        UUID parentRepairSession = UUID.randomUUID();
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+
+        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
+                                                                 Arrays.asList(cfs), Arrays.asList(range), false,
+                                                                 ActiveRepairService.UNREPAIRED_SSTABLE, false,
+                                                                 PreviewKind.NONE);
+
+        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+        MerkleTrees tree1 = createInitialTree(desc);
+
+        MerkleTrees tree2 = createInitialTree(desc);
+
+        // change a range in one of the trees
+        Token token = partitioner.midpoint(range.left, range.right);
+        tree1.invalidate(token);
+        MerkleTree.TreeRange changed = tree1.get(token);
+        changed.hash("non-empty hash!".getBytes());
+
+        Set<Range<Token>> interesting = new HashSet<>();
+        interesting.add(changed);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine here
+        TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
+        TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+        DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1;
+        DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2);
+        try
+        {
+            task.run();
+        }
+        finally
+        {
+            DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30);
+            DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 3;
+        }
+
+        // ensure that the changed range was recorded
+        assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences);
+    }
+
+    @Test
+    public void fullRepairStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
+
+        assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
+        assertTrue(plan.getFlushBeforeTransfer());
+    }
+
+    private static void assertNumInOut(StreamPlan plan, int expectedIncoming, int expectedOutgoing)
+    {
+        StreamCoordinator coordinator = plan.getCoordinator();
+        StreamSession session = Iterables.getOnlyElement(coordinator.getAllStreamSessions());
+        assertEquals(expectedIncoming, session.getNumRequests());
+        assertEquals(expectedOutgoing, session.getNumTransfers());
+    }
+
+    @Test
+    public void incrementalRepairStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, desc.parentSessionId, false, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
+
+        assertEquals(desc.parentSessionId, plan.getPendingRepair());
+        assertFalse(plan.getFlushBeforeTransfer());
+        assertNumInOut(plan, 1, 1);
+    }
+
+    /**
+     * Don't reciprocate streams if the other endpoint is a transient replica
+     */
+    @Test
+    public void transientStreamPlan()
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, true, desc.parentSessionId, false, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT2, Lists.newArrayList(RANGE1));
+        assertNumInOut(plan, 1, 0);
+    }
+
+    private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner)
+    {
+        MerkleTrees tree = new MerkleTrees(partitioner);
+        tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
+        tree.init();
+        for (MerkleTree.TreeRange r : tree.invalids())
+        {
+            r.ensureHashInitialised();
+        }
+        return tree;
+    }
+
+    private MerkleTrees createInitialTree(RepairJobDesc desc)
+    {
+        return createInitialTree(desc, partitioner);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
new file mode 100644
index 0000000..06f968f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest
+{
+    private static final RepairJobDesc DESC = new RepairJobDesc(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID(), "ks", "tbl", ALL_RANGES);
+    private static final List<Range<Token>> RANGE_LIST = ImmutableList.of(RANGE1);
+
+    private static class InstrumentedSymmetricRemoteSyncTask extends SymmetricRemoteSyncTask
+    {
+        public InstrumentedSymmetricRemoteSyncTask(InetAddressAndPort e1, InetAddressAndPort e2)
+        {
+            super(DESC, new TreeResponse(e1, null), new TreeResponse(e2, null), PreviewKind.NONE);
+        }
+
+        RepairMessage sentMessage = null;
+        InetAddressAndPort sentTo = null;
+
+        @Override
+        void sendRequest(RepairMessage request, InetAddressAndPort to)
+        {
+            Assert.assertNull(sentMessage);
+            Assert.assertNotNull(request);
+            Assert.assertNotNull(to);
+            sentMessage = request;
+            sentTo = to;
+        }
+    }
+
+    @Test
+    public void normalSync()
+    {
+        InstrumentedSymmetricRemoteSyncTask syncTask = new InstrumentedSymmetricRemoteSyncTask(PARTICIPANT1, PARTICIPANT2);
+        syncTask.startSync(RANGE_LIST);
+
+        Assert.assertNotNull(syncTask.sentMessage);
+        Assert.assertSame(SyncRequest.class, syncTask.sentMessage.getClass());
+        Assert.assertEquals(PARTICIPANT1, syncTask.sentTo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
index 3ea888d..a7e8272 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
@@ -44,12 +44,13 @@ public class LocalSessionAccessor
         ARS.consistent.local.putSessionUnsafe(session);
     }
 
-    public static void finalizeUnsafe(UUID sessionID)
+    public static long finalizeUnsafe(UUID sessionID)
     {
         LocalSession session = ARS.consistent.local.getSession(sessionID);
         assert session != null;
         session.setState(ConsistentSession.State.FINALIZED);
         ARS.consistent.local.save(session);
+        return session.repairedAt;
     }
 
     public static void failUnsafe(UUID sessionID)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index d368510..e387c41 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -40,10 +40,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.KeyspaceRepairManager;
@@ -136,7 +135,11 @@ public class LocalSessionTest extends AbstractRepairTest
         boolean prepareSessionCalled = false;
 
         @Override
-        ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor)
+        ListenableFuture prepareSession(KeyspaceRepairManager repairManager,
+                                        UUID sessionID,
+                                        Collection<ColumnFamilyStore> tables,
+                                        RangesAtEndpoint ranges,
+                                        ExecutorService executor)
         {
             prepareSessionCalled = true;
             if (prepareSessionFuture != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/schema/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java
index 98bf9ca..7a6b011 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -127,7 +127,7 @@ public class MockSchema
         }
         SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList());
         StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator)
-                                                 .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header)
+                                                 .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, false, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
                                                           RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org