You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/26 09:43:15 UTC

[1/2] cassandra git commit: Transient replication: range movement improvements

Repository: cassandra
Updated Branches:
  refs/heads/trunk 210da3dc0 -> 0379201c7


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 8ae6853..2f412ad 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -105,7 +105,6 @@ public class BootStrapperTest
         InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1");
 
         assertEquals(numOldNodes, tmd.sortedTokens().size());
-        RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1);
         IFailureDetector mockFailureDetector = new IFailureDetector()
         {
             public boolean isAlive(InetAddressAndPort ep)
@@ -120,26 +119,20 @@ public class BootStrapperTest
             public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
             public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
         };
-        s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
+        RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1);
         assertNotNull(Keyspace.open(keyspaceName));
         s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
 
 
-        Collection<Multimap<InetAddressAndPort, FetchReplica>> toFetch = s.toFetch().get(keyspaceName);
+        Multimap<InetAddressAndPort, FetchReplica> toFetch = s.toFetch().get(keyspaceName);
 
         // Check we get get RF new ranges in total
-        long rangesCount = toFetch.stream()
-               .map(Multimap::values)
-               .flatMap(Collection::stream)
-               .map(f -> f.remote)
-               .map(Replica::range)
-               .count();
-        assertEquals(replicationFactor, rangesCount);
+        assertEquals(replicationFactor, toFetch.size());
 
         // there isn't any point in testing the size of these collections for any specific size.  When a random partitioner
         // is used, they will vary.
-        assert toFetch.stream().map(Multimap::values).flatMap(Collection::stream).count() > 0;
-        assert toFetch.stream().map(Multimap::keySet).map(Collection::stream).noneMatch(myEndpoint::equals);
+        assert toFetch.values().size() > 0;
+        assert toFetch.keys().stream().noneMatch(myEndpoint::equals);
         return s;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
index 07d6377..cee4bb9 100644
--- a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
@@ -195,18 +195,26 @@ public class RangeFetchMapCalculatorTest
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
 
         //Return false for all except 127.0.0.5
-        final Predicate<Replica> filter = replica ->
+        final RangeStreamer.SourceFilter filter = new RangeStreamer.SourceFilter()
         {
-            try
+            public boolean apply(Replica replica)
             {
-                if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5")))
-                    return false;
-                else
+                try
+                {
+                    if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5")))
+                        return false;
+                    else
+                        return true;
+                }
+                catch (UnknownHostException e)
+                {
                     return true;
+                }
             }
-            catch (UnknownHostException e)
+
+            public String message(Replica replica)
             {
-                return true;
+                return "Doesn't match 127.0.0.5";
             }
         };
 
@@ -230,7 +238,18 @@ public class RangeFetchMapCalculatorTest
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
 
-        final Predicate<Replica> allDeadFilter = replica -> false;
+        final RangeStreamer.SourceFilter allDeadFilter = new RangeStreamer.SourceFilter()
+        {
+            public boolean apply(Replica replica)
+            {
+                return false;
+            }
+
+            public String message(Replica replica)
+            {
+                return "All dead";
+            }
+        };
 
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(allDeadFilter), "Test");
         calculator.getRangeFetchMap();
@@ -263,18 +282,26 @@ public class RangeFetchMapCalculatorTest
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
 
         //Reject only 127.0.0.3 and accept everyone else
-        final Predicate<Replica> localHostFilter = replica ->
+        final RangeStreamer.SourceFilter localHostFilter = new RangeStreamer.SourceFilter()
         {
-            try
+            public boolean apply(Replica replica)
             {
-                if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
-                    return false;
-                else
+                try
+                {
+                    if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
+                        return false;
+                    else
+                        return true;
+                }
+                catch (UnknownHostException e)
+                {
                     return true;
+                }
             }
-            catch (UnknownHostException e)
+
+            public String message(Replica replica)
             {
-                return true;
+                return "Not 127.0.0.3";
             }
         };
 
@@ -318,18 +345,26 @@ public class RangeFetchMapCalculatorTest
         // and a trivial one:
         addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
 
-        Predicate<Replica> filter = replica ->
+        RangeStreamer.SourceFilter filter = new RangeStreamer.SourceFilter()
         {
-            try
+            public boolean apply(Replica replica)
             {
-                if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
-                    return false;
+                try
+                {
+                    if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
+                        return false;
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                return true;
             }
-            catch (UnknownHostException e)
+
+            public String message(Replica replica)
             {
-                throw new RuntimeException(e);
+                return "Not 127.0.0.3";
             }
-            return true;
         };
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.singleton(filter), "Test");
         Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
index 4afeb5a..23d585f 100644
--- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.RangeRelocator;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 
@@ -368,7 +369,7 @@ public class OldNetworkTopologyStrategyTest
         RangesAtEndpoint currentRanges = strategy.getAddressReplicas().get(movingNode);
         RangesAtEndpoint updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode);
 
-        return asRanges(StorageService.calculateStreamAndFetchRanges(currentRanges, updatedRanges));
+        return asRanges(RangeRelocator.calculateStreamAndFetchRanges(currentRanges, updatedRanges));
     }
 
     private static Map<String, String> optsWithRF(int rf)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
index 63973ea..0ee1f81 100644
--- a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
+++ b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
 
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -47,6 +48,7 @@ import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaCollection;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.locator.Replica.fullReplica;
@@ -60,29 +62,59 @@ import static org.apache.cassandra.service.StorageServiceTest.assertMultimapEqua
  */
 public class BootstrapTransientTest
 {
-    static InetAddressAndPort aAddress;
-    static InetAddressAndPort bAddress;
-    static InetAddressAndPort cAddress;
-    static InetAddressAndPort dAddress;
+    static InetAddressAndPort address02;
+    static InetAddressAndPort address03;
+    static InetAddressAndPort address04;
+    static InetAddressAndPort address05;
 
     @BeforeClass
     public static void setUpClass() throws Exception
     {
-        aAddress = InetAddressAndPort.getByName("127.0.0.1");
-        bAddress = InetAddressAndPort.getByName("127.0.0.2");
-        cAddress = InetAddressAndPort.getByName("127.0.0.3");
-        dAddress = InetAddressAndPort.getByName("127.0.0.4");
+        address02 = InetAddressAndPort.getByName("127.0.0.2");
+        address03 = InetAddressAndPort.getByName("127.0.0.3");
+        address04 = InetAddressAndPort.getByName("127.0.0.4");
+        address05 = InetAddressAndPort.getByName("127.0.0.5");
     }
 
     private final List<InetAddressAndPort> downNodes = new ArrayList<>();
-    Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint());
+
+    final RangeStreamer.SourceFilter alivePredicate = new RangeStreamer.SourceFilter()
+    {
+        public boolean apply(Replica replica)
+        {
+            return !downNodes.contains(replica.endpoint());
+        }
+
+        public String message(Replica replica)
+        {
+            return "Down nodes: " + downNodes;
+        }
+    };
+
+    final RangeStreamer.SourceFilter sourceFilterDownNodesPredicate = new RangeStreamer.SourceFilter()
+    {
+        public boolean apply(Replica replica)
+        {
+            return !sourceFilterDownNodes.contains(replica.endpoint());
+        }
+
+        public String message(Replica replica)
+        {
+            return "Source filter down nodes" + sourceFilterDownNodes;
+        }
+    };
 
     private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>();
-    private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint()));
+
+    private final Collection<RangeStreamer.SourceFilter> sourceFilters = Arrays.asList(alivePredicate,
+                                                                                       sourceFilterDownNodesPredicate,
+                                                                                       new RangeStreamer.ExcludeLocalNodeFilter()
+                                                                                       );
 
     @After
     public void clearDownNode()
     {
+        // TODO: actually use these
         downNodes.clear();
         sourceFilterDownNodes.clear();
     }
@@ -93,27 +125,43 @@ public class BootstrapTransientTest
         DatabaseDescriptor.daemonInitialization();
     }
 
-    Token tenToken = new OrderPreservingPartitioner.StringToken("00010");
+    Token tenToken    = new OrderPreservingPartitioner.StringToken("00010");
     Token twentyToken = new OrderPreservingPartitioner.StringToken("00020");
     Token thirtyToken = new OrderPreservingPartitioner.StringToken("00030");
     Token fourtyToken = new OrderPreservingPartitioner.StringToken("00040");
 
-    Range<Token> aRange = new Range<>(thirtyToken, tenToken);
-    Range<Token> bRange = new Range<>(tenToken, twentyToken);
-    Range<Token> cRange = new Range<>(twentyToken, thirtyToken);
-    Range<Token> dRange = new Range<>(thirtyToken, fourtyToken);
+    Range<Token> range30_10 = new Range<>(thirtyToken, tenToken);
+    Range<Token> range10_20 = new Range<>(tenToken, twentyToken);
+    Range<Token> range20_30 = new Range<>(twentyToken, thirtyToken);
+    Range<Token> range30_40 = new Range<>(thirtyToken, fourtyToken);
+
+    RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(address05, range30_40, true),
+                                                   new Replica(address05, range20_30, true),
+                                                   new Replica(address05, range10_20, false));
 
-    RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(dAddress, dRange, true),
-                                                   new Replica(dAddress, cRange, true),
-                                                   new Replica(dAddress, bRange, false));
 
+
+    public EndpointsForRange endpoints(Replica... replicas)
+    {
+        assert replicas.length > 0;
+
+        Range<Token> range = replicas[0].range();
+        EndpointsForRange.Builder builder = EndpointsForRange.builder(range);
+        for (Replica r : replicas)
+        {
+            assert r.range().equals(range);
+            builder.add(r);
+        }
+
+        return builder.build();
+    }
     @Test
     public void testRangeStreamerRangesToFetch() throws Exception
     {
         EndpointsByReplica expectedResult = new EndpointsByReplica(ImmutableMap.of(
-        fullReplica(dAddress, dRange), EndpointsForRange.builder(aRange).add(fullReplica(bAddress, aRange)).add(transientReplica(cAddress, aRange)).build(),
-        fullReplica(dAddress, cRange), EndpointsForRange.builder(cRange).add(fullReplica(cAddress, cRange)).add(transientReplica(bAddress, cRange)).build(),
-        transientReplica(dAddress, bRange), EndpointsForRange.builder(bRange).add(transientReplica(aAddress, bRange)).build()));
+        transientReplica(address05, range10_20), endpoints(transientReplica(address02, range10_20)),
+        fullReplica(address05, range20_30), endpoints(transientReplica(address03, range20_30), fullReplica(address04, range20_30)),
+        fullReplica(address05, range30_40), endpoints(transientReplica(address04, range30_10), fullReplica(address02, range30_10))));
 
         invokeCalculateRangesToFetchWithPreferredEndpoints(toFetch, constructTMDs(), expectedResult);
     }
@@ -121,11 +169,11 @@ public class BootstrapTransientTest
     private Pair<TokenMetadata, TokenMetadata> constructTMDs()
     {
         TokenMetadata tmd = new TokenMetadata();
-        tmd.updateNormalToken(aRange.right, aAddress);
-        tmd.updateNormalToken(bRange.right, bAddress);
-        tmd.updateNormalToken(cRange.right, cAddress);
+        tmd.updateNormalToken(range30_10.right, address02);
+        tmd.updateNormalToken(range10_20.right, address03);
+        tmd.updateNormalToken(range20_30.right, address04);
         TokenMetadata updated = tmd.cloneOnlyTokenMap();
-        updated.updateNormalToken(dRange.right, dAddress);
+        updated.updateNormalToken(range30_40.right, address05);
 
         return Pair.create(tmd, updated);
     }
@@ -137,14 +185,13 @@ public class BootstrapTransientTest
         DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
 
         EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas,
-                                                                                                                   simpleStrategy(tmds.left),
-                                                                                                                   toFetch,
-                                                                                                                   true,
-                                                                                                                   tmds.left,
-                                                                                                                   tmds.right,
-                                                                                                                   alivePredicate,
-                                                                                                                   "OldNetworkTopologyStrategyTest",
-                                                                                                                   sourceFilters);
+                                                                                               simpleStrategy(tmds.left),
+                                                                                               toFetch,
+                                                                                               true,
+                                                                                               tmds.left,
+                                                                                               tmds.right,
+                                                                                               "OldNetworkTopologyStrategyTest",
+                                                                                               sourceFilters);
         result.asMap().forEach((replica, list) -> System.out.printf("Replica %s, sources %s%n", replica, list));
         assertMultimapEqualsIgnoreOrder(expectedResult, result);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/MoveTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTransientTest.java b/test/unit/org/apache/cassandra/service/MoveTransientTest.java
index 1e24735..e5a63c7 100644
--- a/test/unit/org/apache/cassandra/service/MoveTransientTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTransientTest.java
@@ -20,11 +20,12 @@ package org.apache.cassandra.service;
 
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
-import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.locator.EndpointsByReplica;
 import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.locator.RangesByEndpoint;
@@ -64,27 +65,56 @@ public class MoveTransientTest
 {
     private static final Logger logger = LoggerFactory.getLogger(MoveTransientTest.class);
 
-    static InetAddressAndPort aAddress;
-    static InetAddressAndPort bAddress;
-    static InetAddressAndPort cAddress;
-    static InetAddressAndPort dAddress;
-    static InetAddressAndPort eAddress;
+    static InetAddressAndPort address01;
+    static InetAddressAndPort address02;
+    static InetAddressAndPort address03;
+    static InetAddressAndPort address04;
+    static InetAddressAndPort address05;
 
     @BeforeClass
     public static void setUpClass() throws Exception
     {
-        aAddress = InetAddressAndPort.getByName("127.0.0.1");
-        bAddress = InetAddressAndPort.getByName("127.0.0.2");
-        cAddress = InetAddressAndPort.getByName("127.0.0.3");
-        dAddress = InetAddressAndPort.getByName("127.0.0.4");
-        eAddress = InetAddressAndPort.getByName("127.0.0.5");
+        address01 = InetAddressAndPort.getByName("127.0.0.1");
+        address02 = InetAddressAndPort.getByName("127.0.0.2");
+        address03 = InetAddressAndPort.getByName("127.0.0.3");
+        address04 = InetAddressAndPort.getByName("127.0.0.4");
+        address05 = InetAddressAndPort.getByName("127.0.0.5");
     }
 
-    private final List<InetAddressAndPort> downNodes = new ArrayList();
-    Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint());
+    private final List<InetAddressAndPort> downNodes = new ArrayList<>();
+
+    final RangeStreamer.SourceFilter alivePredicate = new RangeStreamer.SourceFilter()
+    {
+        public boolean apply(Replica replica)
+        {
+            return !downNodes.contains(replica.endpoint());
+        }
+
+        public String message(Replica replica)
+        {
+            return "Down nodes: " + downNodes;
+        }
+    };
+
+    final RangeStreamer.SourceFilter sourceFilterDownNodesPredicate = new RangeStreamer.SourceFilter()
+    {
+        public boolean apply(Replica replica)
+        {
+            return !sourceFilterDownNodes.contains(replica.endpoint());
+        }
+
+        public String message(Replica replica)
+        {
+            return "Source filter down nodes: " + sourceFilterDownNodes;
+        }
+    };
 
     private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>();
-    private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint()));
+
+    private final Collection<RangeStreamer.SourceFilter> sourceFilters = Arrays.asList(alivePredicate,
+                                                                                       sourceFilterDownNodesPredicate,
+                                                                                       new RangeStreamer.ExcludeLocalNodeFilter()
+    );
 
     @After
     public void clearDownNode()
@@ -99,27 +129,36 @@ public class MoveTransientTest
         DatabaseDescriptor.daemonInitialization();
     }
 
-    Token oneToken = new RandomPartitioner.BigIntegerToken("1");
-    Token twoToken = new RandomPartitioner.BigIntegerToken("2");
-    Token threeToken = new RandomPartitioner.BigIntegerToken("3");
-    Token fourToken = new RandomPartitioner.BigIntegerToken("4");
-    Token sixToken = new RandomPartitioner.BigIntegerToken("6");
-    Token sevenToken = new RandomPartitioner.BigIntegerToken("7");
-    Token nineToken = new RandomPartitioner.BigIntegerToken("9");
-    Token elevenToken = new RandomPartitioner.BigIntegerToken("11");
-    Token fourteenToken = new RandomPartitioner.BigIntegerToken("14");
+    final Token oneToken = new RandomPartitioner.BigIntegerToken("1");
+    final Token twoToken = new RandomPartitioner.BigIntegerToken("2");
+    final Token threeToken = new RandomPartitioner.BigIntegerToken("3");
+    final Token fourToken = new RandomPartitioner.BigIntegerToken("4");
+    final Token sixToken = new RandomPartitioner.BigIntegerToken("6");
+    final Token sevenToken = new RandomPartitioner.BigIntegerToken("7");
+    final Token nineToken = new RandomPartitioner.BigIntegerToken("9");
+    final Token elevenToken = new RandomPartitioner.BigIntegerToken("11");
+    final Token fourteenToken = new RandomPartitioner.BigIntegerToken("14");
 
-    Range<Token> aRange = new Range(oneToken, threeToken);
-    Range<Token> bRange = new Range(threeToken, sixToken);
-    Range<Token> cRange = new Range(sixToken, nineToken);
-    Range<Token> dRange = new Range(nineToken, elevenToken);
-    Range<Token> eRange = new Range(elevenToken, oneToken);
+    final Range<Token> range_1_2 = new Range(oneToken, threeToken);
+    final Range<Token> range_3_6 = new Range(threeToken, sixToken);
+    final Range<Token> range_6_9 = new Range(sixToken, nineToken);
+    final Range<Token> range_9_11 = new Range(nineToken, elevenToken);
+    final Range<Token> range_11_1 = new Range(elevenToken, oneToken);
 
 
-    RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(aAddress, aRange, true),
-                                       new Replica(aAddress, eRange, true),
-                                       new Replica(aAddress, dRange, false));
+    final RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(address01, range_1_2, true),
+                                                         new Replica(address01, range_11_1, true),
+                                                         new Replica(address01, range_9_11, false));
+
+    public Token token(String s)
+    {
+        return new RandomPartitioner.BigIntegerToken(s);
+    }
 
+    public Range<Token> range(String start, String end)
+    {
+        return new Range<>(token(start), token(end));
+    }
 
     /**
      * Ring with start A 1-3 B 3-6 C 6-9 D 9-1
@@ -140,14 +179,14 @@ public class MoveTransientTest
         Range<Token> aPrimeRange = new Range<>(oneToken, fourToken);
 
         RangesAtEndpoint updated = RangesAtEndpoint.of(
-                new Replica(aAddress, aPrimeRange, true),
-                new Replica(aAddress, eRange, true),
-                new Replica(aAddress, dRange, false)
+                new Replica(address01, aPrimeRange, true),
+                new Replica(address01, range_11_1, true),
+                new Replica(address01, range_9_11, false)
         );
 
-        Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+        Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated);
         assertContentsIgnoreOrder(result.left);
-        assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, fourToken));
+        assertContentsIgnoreOrder(result.right, fullReplica(address01, threeToken, fourToken));
         return result;
     }
 
@@ -170,15 +209,15 @@ public class MoveTransientTest
         Range<Token> aPrimeRange = new Range<>(elevenToken, fourteenToken);
 
         RangesAtEndpoint updated = RangesAtEndpoint.of(
-            new Replica(aAddress, aPrimeRange, true),
-            new Replica(aAddress, dRange, true),
-            new Replica(aAddress, cRange, false)
+            new Replica(address01, aPrimeRange, true),
+            new Replica(address01, range_9_11, true),
+            new Replica(address01, range_6_9, false)
         );
 
 
-        Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
-        assertContentsIgnoreOrder(result.left, fullReplica(aAddress, oneToken, threeToken), fullReplica(aAddress, fourteenToken, oneToken));
-        assertContentsIgnoreOrder(result.right, transientReplica(aAddress, sixToken, nineToken), fullReplica(aAddress, nineToken, elevenToken));
+        Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated);
+        assertContentsIgnoreOrder(result.left, fullReplica(address01, oneToken, threeToken), fullReplica(address01, fourteenToken, oneToken));
+        assertContentsIgnoreOrder(result.right, transientReplica(address01, sixToken, nineToken), fullReplica(address01, nineToken, elevenToken));
         return result;
     }
 
@@ -200,16 +239,16 @@ public class MoveTransientTest
         Range<Token> aPrimeRange = new Range<>(oneToken, twoToken);
 
         RangesAtEndpoint updated = RangesAtEndpoint.of(
-            new Replica(aAddress, aPrimeRange, true),
-            new Replica(aAddress, eRange, true),
-            new Replica(aAddress, dRange, false)
+            new Replica(address01, aPrimeRange, true),
+            new Replica(address01, range_11_1, true),
+            new Replica(address01, range_9_11, false)
         );
 
-        Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+        Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated);
 
         //Moving backwards has no impact on any replica. We already fully replicate counter clockwise
         //The transient replica does transiently replicate slightly more, but that is addressed by cleanup
-        assertContentsIgnoreOrder(result.left, fullReplica(aAddress, twoToken, threeToken));
+        assertContentsIgnoreOrder(result.left, fullReplica(address01, twoToken, threeToken));
         assertContentsIgnoreOrder(result.right);
 
         return result;
@@ -226,17 +265,16 @@ public class MoveTransientTest
         Range<Token> aPrimeRange = new Range<>(sixToken, sevenToken);
         Range<Token> bPrimeRange = new Range<>(oneToken, sixToken);
 
-
         RangesAtEndpoint updated = RangesAtEndpoint.of(
-            new Replica(aAddress, aPrimeRange, true),
-            new Replica(aAddress, bPrimeRange, true),
-            new Replica(aAddress, eRange, false)
+            new Replica(address01, aPrimeRange, true),
+            new Replica(address01, bPrimeRange, true),
+            new Replica(address01, range_11_1, false)
         );
 
-        Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated);
+        Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated);
 
-        assertContentsIgnoreOrder(result.left, fullReplica(aAddress, elevenToken, oneToken), transientReplica(aAddress, nineToken, elevenToken));
-        assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, sixToken), fullReplica(aAddress, sixToken, sevenToken));
+        assertContentsIgnoreOrder(result.left, fullReplica(address01, elevenToken, oneToken), transientReplica(address01, nineToken, elevenToken));
+        assertContentsIgnoreOrder(result.right, fullReplica(address01, threeToken, sixToken), fullReplica(address01, sixToken, sevenToken));
         return result;
     }
 
@@ -252,6 +290,37 @@ public class MoveTransientTest
         calculateStreamAndFetchRangesMoveForwardBetween();
     }
 
+    @Test
+    public void testResubtract()
+    {
+        Token oneToken = new RandomPartitioner.BigIntegerToken("0001");
+        Token tenToken = new RandomPartitioner.BigIntegerToken("0010");
+        Token fiveToken = new RandomPartitioner.BigIntegerToken("0005");
+
+        Range<Token> range_1_10 = new Range<>(oneToken, tenToken);
+        Range<Token> range_1_5 = new Range<>(oneToken, tenToken);
+        Range<Token> range_5_10 = new Range<>(fiveToken, tenToken);
+
+        RangesAtEndpoint singleRange = RangesAtEndpoint.of(
+        new Replica(address01, range_1_10, true)
+        );
+
+        RangesAtEndpoint splitRanges = RangesAtEndpoint.of(
+        new Replica(address01, range_1_5, true),
+        new Replica(address01, range_5_10, true)
+        );
+
+        // forward
+        Pair<RangesAtEndpoint, RangesAtEndpoint> calculated = RangeRelocator.calculateStreamAndFetchRanges(singleRange, splitRanges);
+        assertTrue(calculated.left.toString(), calculated.left.isEmpty());
+        assertTrue(calculated.right.toString(), calculated.right.isEmpty());
+
+        // backward
+        calculated = RangeRelocator.calculateStreamAndFetchRanges(splitRanges, singleRange);
+        assertTrue(calculated.left.toString(), calculated.left.isEmpty());
+        assertTrue(calculated.right.toString(), calculated.right.isEmpty());
+    }
+
     /**
      * Construct the ring state for calculateStreamAndFetchRangesMoveBackwardBetween
      * Where are A moves from 3 to 14
@@ -260,12 +329,12 @@ public class MoveTransientTest
     private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackwardBetween()
     {
         TokenMetadata tmd = new TokenMetadata();
-        tmd.updateNormalToken(aRange.right, aAddress);
-        tmd.updateNormalToken(bRange.right, bAddress);
-        tmd.updateNormalToken(cRange.right, cAddress);
-        tmd.updateNormalToken(dRange.right, dAddress);
-        tmd.updateNormalToken(eRange.right, eAddress);
-        tmd.addMovingEndpoint(fourteenToken, aAddress);
+        tmd.updateNormalToken(range_1_2.right, address01);
+        tmd.updateNormalToken(range_3_6.right, address02);
+        tmd.updateNormalToken(range_6_9.right, address03);
+        tmd.updateNormalToken(range_9_11.right, address04);
+        tmd.updateNormalToken(range_11_1.right, address05);
+        tmd.addMovingEndpoint(fourteenToken, address01);
         TokenMetadata updated = tmd.cloneAfterAllSettled();
 
         return Pair.create(tmd, updated);
@@ -280,12 +349,12 @@ public class MoveTransientTest
     private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForwardBetween()
     {
         TokenMetadata tmd = new TokenMetadata();
-        tmd.updateNormalToken(aRange.right, aAddress);
-        tmd.updateNormalToken(bRange.right, bAddress);
-        tmd.updateNormalToken(cRange.right, cAddress);
-        tmd.updateNormalToken(dRange.right, dAddress);
-        tmd.updateNormalToken(eRange.right, eAddress);
-        tmd.addMovingEndpoint(sevenToken, aAddress);
+        tmd.updateNormalToken(range_1_2.right, address01);
+        tmd.updateNormalToken(range_3_6.right, address02);
+        tmd.updateNormalToken(range_6_9.right, address03);
+        tmd.updateNormalToken(range_9_11.right, address04);
+        tmd.updateNormalToken(range_11_1.right, address05);
+        tmd.addMovingEndpoint(sevenToken, address01);
         TokenMetadata updated = tmd.cloneAfterAllSettled();
 
         return Pair.create(tmd, updated);
@@ -294,12 +363,12 @@ public class MoveTransientTest
     private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackward()
     {
         TokenMetadata tmd = new TokenMetadata();
-        tmd.updateNormalToken(aRange.right, aAddress);
-        tmd.updateNormalToken(bRange.right, bAddress);
-        tmd.updateNormalToken(cRange.right, cAddress);
-        tmd.updateNormalToken(dRange.right, dAddress);
-        tmd.updateNormalToken(eRange.right, eAddress);
-        tmd.addMovingEndpoint(twoToken, aAddress);
+        tmd.updateNormalToken(range_1_2.right, address01);
+        tmd.updateNormalToken(range_3_6.right, address02);
+        tmd.updateNormalToken(range_6_9.right, address03);
+        tmd.updateNormalToken(range_9_11.right, address04);
+        tmd.updateNormalToken(range_11_1.right, address05);
+        tmd.addMovingEndpoint(twoToken, address01);
         TokenMetadata updated = tmd.cloneAfterAllSettled();
 
         return Pair.create(tmd, updated);
@@ -308,12 +377,12 @@ public class MoveTransientTest
     private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForward()
     {
         TokenMetadata tmd = new TokenMetadata();
-        tmd.updateNormalToken(aRange.right, aAddress);
-        tmd.updateNormalToken(bRange.right, bAddress);
-        tmd.updateNormalToken(cRange.right, cAddress);
-        tmd.updateNormalToken(dRange.right, dAddress);
-        tmd.updateNormalToken(eRange.right, eAddress);
-        tmd.addMovingEndpoint(fourToken, aAddress);
+        tmd.updateNormalToken(range_1_2.right, address01);
+        tmd.updateNormalToken(range_3_6.right, address02);
+        tmd.updateNormalToken(range_6_9.right, address03);
+        tmd.updateNormalToken(range_9_11.right, address04);
+        tmd.updateNormalToken(range_11_1.right, address05);
+        tmd.addMovingEndpoint(fourToken, address01);
         TokenMetadata updated = tmd.cloneAfterAllSettled();
 
         return Pair.create(tmd, updated);
@@ -325,15 +394,15 @@ public class MoveTransientTest
     {
         EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
 
-        InetAddressAndPort cOrB = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress;
+        InetAddressAndPort cOrB = (downNodes.contains(address03) || sourceFilterDownNodes.contains(address03)) ? address02 : address03;
 
         //Need to pull the full replica and the transient replica that is losing the range
-        expectedResult.put(fullReplica(aAddress, sixToken, sevenToken),  fullReplica(dAddress, sixToken, nineToken));
-        expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), transientReplica(eAddress, sixToken, nineToken));
+        expectedResult.put(fullReplica(address01, sixToken, sevenToken), fullReplica(address04, sixToken, nineToken));
+        expectedResult.put(fullReplica(address01, sixToken, sevenToken), transientReplica(address05, sixToken, nineToken));
 
         //Same need both here as well
-        expectedResult.put(fullReplica(aAddress, threeToken, sixToken), fullReplica(cOrB, threeToken, sixToken));
-        expectedResult.put(fullReplica(aAddress, threeToken, sixToken), transientReplica(dAddress, threeToken, sixToken));
+        expectedResult.put(fullReplica(address01, threeToken, sixToken), fullReplica(cOrB, threeToken, sixToken));
+        expectedResult.put(fullReplica(address01, threeToken, sixToken), transientReplica(address04, threeToken, sixToken));
 
         invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().right,
                                                            constructTMDsMoveForwardBetween(),
@@ -343,7 +412,7 @@ public class MoveTransientTest
     @Test
     public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
     {
-        for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress})
+        for (InetAddressAndPort downNode : new InetAddressAndPort[] { address04, address05 })
         {
             downNodes.clear();
             downNodes.add(downNode);
@@ -356,8 +425,7 @@ public class MoveTransientTest
             {
                 ise.printStackTrace();
                 assertTrue(downNode.toString(),
-                           ise.getMessage().startsWith("A node required to move the data consistently is down:")
-                                    && ise.getMessage().contains(downNode.toString()));
+                           ise.getMessage().contains("Down nodes: [" + downNode + "]"));
                 threw = true;
             }
             assertTrue("Didn't throw for " + downNode, threw);
@@ -365,14 +433,14 @@ public class MoveTransientTest
 
         //Shouldn't throw because another full replica is available
         downNodes.clear();
-        downNodes.add(cAddress);
+        downNodes.add(address03);
         testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
     }
 
     @Test
     public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
     {
-        for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress})
+        for (InetAddressAndPort downNode : new InetAddressAndPort[] { address04, address05 })
         {
             sourceFilterDownNodes.clear();
             sourceFilterDownNodes.add(downNode);
@@ -394,7 +462,7 @@ public class MoveTransientTest
 
         //Shouldn't throw because another full replica is available
         sourceFilterDownNodes.clear();
-        sourceFilterDownNodes.add(cAddress);
+        sourceFilterDownNodes.add(address03);
         testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
     }
 
@@ -404,8 +472,8 @@ public class MoveTransientTest
         EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
 
         //Need to pull the full replica and the transient replica that is losing the range
-        expectedResult.put(fullReplica(aAddress, nineToken, elevenToken), fullReplica(eAddress, nineToken, elevenToken));
-        expectedResult.put(transientReplica(aAddress, sixToken, nineToken), transientReplica(eAddress, sixToken, nineToken));
+        expectedResult.put(fullReplica(address01, nineToken, elevenToken), fullReplica(address05, nineToken, elevenToken));
+        expectedResult.put(transientReplica(address01, sixToken, nineToken), transientReplica(address05, sixToken, nineToken));
 
         invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().right,
                                                            constructTMDsMoveBackwardBetween(),
@@ -417,7 +485,7 @@ public class MoveTransientTest
     public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
     {
         //Any replica can be the full replica so this will always fail on the transient range
-        downNodes.add(eAddress);
+        downNodes.add(address05);
         testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
     }
 
@@ -425,7 +493,7 @@ public class MoveTransientTest
     public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
     {
         //Any replica can be the full replica so this will always fail on the transient range
-        sourceFilterDownNodes.add(eAddress);
+        sourceFilterDownNodes.add(address05);
         testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
     }
 
@@ -448,11 +516,11 @@ public class MoveTransientTest
     {
         EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
 
-        InetAddressAndPort cOrBAddress = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress;
+        InetAddressAndPort cOrBAddress = (downNodes.contains(address03) || sourceFilterDownNodes.contains(address03)) ? address02 : address03;
 
         //Need to pull the full replica and the transient replica that is losing the range
-        expectedResult.put(fullReplica(aAddress, threeToken, fourToken), fullReplica(cOrBAddress, threeToken, sixToken));
-        expectedResult.put(fullReplica(aAddress, threeToken, fourToken), transientReplica(dAddress, threeToken, sixToken));
+        expectedResult.put(fullReplica(address01, threeToken, fourToken), fullReplica(cOrBAddress, threeToken, sixToken));
+        expectedResult.put(fullReplica(address01, threeToken, fourToken), transientReplica(address04, threeToken, sixToken));
 
         invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().right,
                                                            constructTMDsMoveForward(),
@@ -463,7 +531,7 @@ public class MoveTransientTest
     @Test
     public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception
     {
-        downNodes.add(dAddress);
+        downNodes.add(address04);
         boolean threw = false;
         try
         {
@@ -472,23 +540,22 @@ public class MoveTransientTest
         catch (IllegalStateException ise)
         {
             ise.printStackTrace();
-            assertTrue(dAddress.toString(),
-                       ise.getMessage().startsWith("A node required to move the data consistently is down:")
-                       && ise.getMessage().contains(dAddress.toString()));
+            assertTrue(address04.toString(),
+                       ise.getMessage().contains("Down nodes: [" + address04 + "]"));
             threw = true;
         }
-        assertTrue("Didn't throw for " + dAddress, threw);
+        assertTrue("Didn't throw for " + address04, threw);
 
         //Shouldn't throw because another full replica is available
         downNodes.clear();
-        downNodes.add(cAddress);
+        downNodes.add(address03);
         testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
     }
 
     @Test
     public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception
     {
-        sourceFilterDownNodes.add(dAddress);
+        sourceFilterDownNodes.add(address04);
         boolean threw = false;
         try
         {
@@ -497,16 +564,16 @@ public class MoveTransientTest
         catch (IllegalStateException ise)
         {
             ise.printStackTrace();
-            assertTrue(dAddress.toString(),
+            assertTrue(address04.toString(),
                        ise.getMessage().startsWith("Necessary replicas for strict consistency were removed by source filters:")
-                       && ise.getMessage().contains(dAddress.toString()));
+                       && ise.getMessage().contains(address04.toString()));
             threw = true;
         }
-        assertTrue("Didn't throw for " + dAddress, threw);
+        assertTrue("Didn't throw for " + address04, threw);
 
         //Shouldn't throw because another full replica is available
         sourceFilterDownNodes.clear();
-        sourceFilterDownNodes.add(cAddress);
+        sourceFilterDownNodes.add(address03);
         testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
     }
 
@@ -517,18 +584,16 @@ public class MoveTransientTest
         DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
 
         EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas.sorted((a, b) -> b.endpoint().compareTo(a.endpoint())),
-                                                                                                                   simpleStrategy(tmds.left),
-                                                                                                                   toFetch,
-                                                                                                                   true,
-                                                                                                                   tmds.left,
-                                                                                                                   tmds.right,
-                                                                                                                   alivePredicate,
-                                                                                                                   "OldNetworkTopologyStrategyTest",
-                                                                                                                   sourceFilters);
+                                                                                               simpleStrategy(tmds.left),
+                                                                                               toFetch,
+                                                                                               true,
+                                                                                               tmds.left,
+                                                                                               tmds.right,
+                                                                                               "OldNetworkTopologyStrategyTest",
+                                                                                               sourceFilters);
         logger.info("Ranges to fetch with preferred endpoints");
         logger.info(result.toString());
         assertMultimapEqualsIgnoreOrder(expectedResult, result);
-
     }
 
     private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
@@ -564,8 +629,8 @@ public class MoveTransientTest
         RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
 
         //Need to pull the full replica and the transient replica that is losing the range
-        expectedResult.put(bAddress, transientReplica(bAddress, nineToken, elevenToken));
-        expectedResult.put(bAddress, fullReplica(bAddress, elevenToken, oneToken));
+        expectedResult.put(address02, transientReplica(address02, nineToken, elevenToken));
+        expectedResult.put(address02, fullReplica(address02, elevenToken, oneToken));
 
         invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().left,
                                                             constructTMDsMoveForwardBetween(),
@@ -577,12 +642,12 @@ public class MoveTransientTest
     {
         RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
 
-        expectedResult.put(bAddress, fullReplica(bAddress, fourteenToken, oneToken));
+        expectedResult.put(address02, fullReplica(address02, fourteenToken, oneToken));
 
-        expectedResult.put(dAddress, transientReplica(dAddress, oneToken, threeToken));
+        expectedResult.put(address04, transientReplica(address04, oneToken, threeToken));
 
-        expectedResult.put(cAddress, fullReplica(cAddress, oneToken, threeToken));
-        expectedResult.put(cAddress, transientReplica(cAddress, fourteenToken, oneToken));
+        expectedResult.put(address03, fullReplica(address03, oneToken, threeToken));
+        expectedResult.put(address03, transientReplica(address03, fourteenToken, oneToken));
 
         invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().left,
                                                             constructTMDsMoveBackwardBetween(),
@@ -593,8 +658,8 @@ public class MoveTransientTest
     public void testMoveBackwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception
     {
         RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable();
-        expectedResult.put(cAddress, fullReplica(cAddress, twoToken, threeToken));
-        expectedResult.put(dAddress, transientReplica(dAddress, twoToken, threeToken));
+        expectedResult.put(address03, fullReplica(address03, twoToken, threeToken));
+        expectedResult.put(address04, transientReplica(address04, twoToken, threeToken));
 
         invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().left,
                                                             constructTMDsMoveBackward(),
@@ -617,7 +682,7 @@ public class MoveTransientTest
                                                                      RangesByEndpoint expectedResult)
     {
         DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
-        StorageService.RangeRelocator relocator = new StorageService.RangeRelocator();
+        RangeRelocator relocator = new RangeRelocator();
         RangesByEndpoint result = relocator.calculateRangesToStreamWithEndpoints(toStream,
                                                                                  simpleStrategy(tmds.left),
                                                                                  tmds.left,
@@ -631,8 +696,10 @@ public class MoveTransientTest
     {
         assertEquals(ranges.size(), replicas.length);
         for (Replica replica : replicas)
+        {
             if (!ranges.contains(replica))
-                assertEquals(RangesAtEndpoint.of(replicas), ranges);
+                assertTrue(Iterables.elementsEqual(RangesAtEndpoint.of(replicas), ranges));
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/StorageServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java
index 9d5c324..cc7fac3 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class StorageServiceTest
 {
@@ -106,21 +107,32 @@ public class StorageServiceTest
     public static <K, C extends ReplicaCollection<? extends C>>  void assertMultimapEqualsIgnoreOrder(ReplicaMultimap<K, C> a, ReplicaMultimap<K, C> b)
     {
         if (!a.keySet().equals(b.keySet()))
-            assertEquals(a, b);
+            fail(formatNeq(a, b));
         for (K key : a.keySet())
         {
             C ac = a.get(key);
             C bc = b.get(key);
             if (ac.size() != bc.size())
-                assertEquals(a, b);
+                fail(formatNeq(a, b));
             for (Replica r : ac)
             {
                 if (!bc.contains(r))
-                    assertEquals(a, b);
+                    fail(formatNeq(a, b));
             }
         }
     }
 
+    public static String formatNeq(Object v1, Object v2)
+    {
+        return "\nExpected: " + formatClassAndValue(v1) + "\n but was: " + formatClassAndValue(v2);
+    }
+
+    public static String formatClassAndValue(Object value)
+    {
+        String className = value == null ? "null" : value.getClass().getName();
+        return className + "<" + String.valueOf(value) + ">";
+    }
+
     @Test
     public void testGetChangedReplicasForLeaving() throws Exception
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/2] cassandra git commit: Transient replication: range movement improvements

Posted by if...@apache.org.
Transient replication: range movement improvements

Patch by Alex Petrov; reviewed by Ariel Weisberg and Benedict Elliott Smith for CASSANDRA-14756

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0379201c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0379201c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0379201c

Branch: refs/heads/trunk
Commit: 0379201c7057f6bac4abf1e0f3d81a12d90abd08
Parents: 210da3d
Author: Alex Petrov <ol...@gmail.com>
Authored: Mon Sep 17 11:51:56 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Sep 26 11:42:46 2018 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/db/SystemKeyspace.java |  31 +-
 .../org/apache/cassandra/dht/BootStrapper.java  |   3 -
 .../cassandra/dht/RangeFetchMapCalculator.java  |   2 +-
 .../org/apache/cassandra/dht/RangeStreamer.java | 448 ++++++++++---------
 .../apache/cassandra/dht/StreamStateStore.java  |  12 +-
 .../cassandra/locator/RangesAtEndpoint.java     |   6 +
 .../cassandra/service/RangeRelocator.java       | 324 ++++++++++++++
 .../cassandra/service/StorageService.java       | 314 +------------
 .../apache/cassandra/streaming/StreamPlan.java  |  17 +-
 .../cassandra/streaming/StreamSession.java      |   8 +-
 .../apache/cassandra/dht/BootStrapperTest.java  |  17 +-
 .../dht/RangeFetchMapCalculatorTest.java        |  79 +++-
 .../locator/OldNetworkTopologyStrategyTest.java |   3 +-
 .../service/BootstrapTransientTest.java         | 113 +++--
 .../cassandra/service/MoveTransientTest.java    | 321 +++++++------
 .../cassandra/service/StorageServiceTest.java   |  18 +-
 16 files changed, 981 insertions(+), 735 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index ff070a3..0f904ce 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -32,12 +32,11 @@ import javax.management.openmbean.TabularData;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1285,24 +1284,40 @@ public final class SystemKeyspace
                         keyspace);
     }
 
-    public static synchronized RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
+    /**
+     * List of the streamed ranges, where transientness is encoded based on the source, where range was streamed from.
+     */
+    public static synchronized AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner)
     {
         String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
         UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES_V2), keyspace);
-        InetAddressAndPort endpoint = InetAddressAndPort.getLocalHost();
-        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
+
+        ImmutableSet.Builder<Range<Token>> full = new ImmutableSet.Builder<>();
+        ImmutableSet.Builder<Range<Token>> trans = new ImmutableSet.Builder<>();
         for (UntypedResultSet.Row row : rs)
         {
             Optional.ofNullable(row.getSet("full_ranges", BytesType.instance))
                     .ifPresent(full_ranges -> full_ranges.stream()
                             .map(buf -> byteBufferToRange(buf, partitioner))
-                            .forEach(range -> builder.add(fullReplica(endpoint, range))));
+                            .forEach(full::add));
             Optional.ofNullable(row.getSet("transient_ranges", BytesType.instance))
                     .ifPresent(transient_ranges -> transient_ranges.stream()
                             .map(buf -> byteBufferToRange(buf, partitioner))
-                            .forEach(range -> builder.add(transientReplica(endpoint, range))));
+                            .forEach(trans::add));
+        }
+        return new AvailableRanges(full.build(), trans.build());
+    }
+
+    public static class AvailableRanges
+    {
+        public Set<Range<Token>> full;
+        public Set<Range<Token>> trans;
+
+        private AvailableRanges(Set<Range<Token>> full, Set<Range<Token>> trans)
+        {
+            this.full = full;
+            this.trans = trans;
         }
-        return builder.build();
     }
 
     public static void resetAvailableRanges()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 92bf8c8..cef605e 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -79,9 +79,6 @@ public class BootStrapper extends ProgressEventNotifierSupport
                                                    stateStore,
                                                    true,
                                                    DatabaseDescriptor.getStreamingConnectionsPerHost());
-        streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
-        streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
-
         for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
         {
             AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index 4b98b97..63265b7 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -84,7 +84,7 @@ public class RangeFetchMapCalculator
     private final Set<Range<Token>> trivialRanges;
 
     public RangeFetchMapCalculator(EndpointsByRange rangesWithSources,
-                                   Collection<Predicate<Replica>> sourceFilters,
+                                   Collection<RangeStreamer.SourceFilter> sourceFilters,
                                    String keyspace)
     {
         this.rangesWithSources = rangesWithSources;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index e8aa5d3..f46d665 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -27,9 +27,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Multimap;
 
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsByReplica;
@@ -53,12 +53,12 @@ import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaCollection;
 import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import static com.google.common.base.Predicates.and;
 import static com.google.common.base.Predicates.not;
@@ -87,8 +87,8 @@ public class RangeStreamer
     private final InetAddressAndPort address;
     /* streaming description */
     private final String description;
-    private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = HashMultimap.create();
-    private final Set<Predicate<Replica>> sourceFilters = new HashSet<>();
+    private final Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = new HashMap<>();
+    private final List<SourceFilter> sourceFilters = new ArrayList<>();
     private final StreamPlan streamPlan;
     private final boolean useStrictConsistency;
     private final IEndpointSnitch snitch;
@@ -97,6 +97,7 @@ public class RangeStreamer
     public static class FetchReplica
     {
         public final Replica local;
+        // Source replica
         public final Replica remote;
 
         public FetchReplica(Replica local, Replica remote)
@@ -135,11 +136,17 @@ public class RangeStreamer
         }
     }
 
+    public interface SourceFilter extends Predicate<Replica>
+    {
+        public boolean apply(Replica replica);
+        public String message(Replica replica);
+    }
+
     /**
      * Source filter which excludes any endpoints that are not alive according to a
      * failure detector.
      */
-    public static class FailureDetectorSourceFilter implements Predicate<Replica>
+    public static class FailureDetectorSourceFilter implements SourceFilter
     {
         private final IFailureDetector fd;
 
@@ -148,16 +155,23 @@ public class RangeStreamer
             this.fd = fd;
         }
 
+        @Override
         public boolean apply(Replica replica)
         {
             return fd.isAlive(replica.endpoint());
         }
+
+        @Override
+        public String message(Replica replica)
+        {
+            return "Filtered " + replica + " out because it was down";
+        }
     }
 
     /**
      * Source filter which excludes any endpoints that are not in a specific data center.
      */
-    public static class SingleDatacenterFilter implements Predicate<Replica>
+    public static class SingleDatacenterFilter implements SourceFilter
     {
         private final String sourceDc;
         private final IEndpointSnitch snitch;
@@ -168,27 +182,41 @@ public class RangeStreamer
             this.snitch = snitch;
         }
 
+        @Override
         public boolean apply(Replica replica)
         {
             return snitch.getDatacenter(replica).equals(sourceDc);
         }
+
+        @Override
+        public String message(Replica replica)
+        {
+            return "Filtered " + replica + " out because it does not belong to " + sourceDc + " datacenter";
+        }
     }
 
     /**
      * Source filter which excludes the current node from source calculations
      */
-    public static class ExcludeLocalNodeFilter implements Predicate<Replica>
+    public static class ExcludeLocalNodeFilter implements SourceFilter
     {
+        @Override
         public boolean apply(Replica replica)
         {
             return !replica.isLocal();
         }
+
+        @Override
+        public String message(Replica replica)
+        {
+            return "Filtered " + replica + " out because it is local";
+        }
     }
 
     /**
      * Source filter which only includes endpoints contained within a provided set.
      */
-    public static class WhitelistedSourcesFilter implements Predicate<Replica>
+    public static class WhitelistedSourcesFilter implements SourceFilter
     {
         private final Set<InetAddressAndPort> whitelistedSources;
 
@@ -201,6 +229,12 @@ public class RangeStreamer
         {
             return whitelistedSources.contains(replica.endpoint());
         }
+
+        @Override
+        public String message(Replica replica)
+        {
+            return "Filtered " + replica + " out because it was not whitelisted, whitelisted sources: " + whitelistedSources;
+        }
     }
 
     public RangeStreamer(TokenMetadata metadata,
@@ -213,6 +247,21 @@ public class RangeStreamer
                          boolean connectSequentially,
                          int connectionsPerHost)
     {
+        this(metadata, tokens, address, streamOperation, useStrictConsistency, snitch, stateStore,
+             FailureDetector.instance, connectSequentially, connectionsPerHost);
+    }
+
+    RangeStreamer(TokenMetadata metadata,
+                  Collection<Token> tokens,
+                  InetAddressAndPort address,
+                  StreamOperation streamOperation,
+                  boolean useStrictConsistency,
+                  IEndpointSnitch snitch,
+                  StreamStateStore stateStore,
+                  IFailureDetector failureDetector,
+                  boolean connectSequentially,
+                  int connectionsPerHost)
+    {
         Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation);
         this.metadata = metadata;
         this.tokens = tokens;
@@ -223,13 +272,34 @@ public class RangeStreamer
         this.snitch = snitch;
         this.stateStore = stateStore;
         streamPlan.listeners(this.stateStore);
+
+        // We're _always_ filtering out a local node and down sources
+        addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(failureDetector));
+        addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
     }
 
-    public void addSourceFilter(Predicate<Replica> filter)
+    public void addSourceFilter(SourceFilter filter)
     {
         sourceFilters.add(filter);
     }
 
+    // Creates error message from source filters
+    private static String buildErrorMessage(Collection<SourceFilter> sourceFilters, ReplicaCollection<?> replicas)
+    {
+        StringBuilder failureMessage = new StringBuilder();
+        for (Replica r : replicas)
+        {
+            for (SourceFilter filter : sourceFilters)
+            {
+                if (!filter.apply(r))
+                {
+                    failureMessage.append(filter.message(r));
+                    break;
+                }
+            }
+        }
+        return failureMessage.toString();
+    }
     /**
      * Add ranges to be streamed for given keyspace.
      *
@@ -252,7 +322,6 @@ public class RangeStreamer
         for (Map.Entry<Replica, Replica> entry : fetchMap.flattenEntries())
             logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName);
 
-
         Multimap<InetAddressAndPort, FetchReplica> workMap;
         //Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no
         //transient replicas.
@@ -265,10 +334,12 @@ public class RangeStreamer
             workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName);
         }
 
-        toFetch.put(keyspaceName, workMap);
-        for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
+        if (toFetch.put(keyspaceName, workMap) != null)
+            throw new IllegalArgumentException("Keyspace is already added to fetch map");
+
+        if (logger.isTraceEnabled())
         {
-            if (logger.isTraceEnabled())
+            for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
             {
                 for (FetchReplica r : entry.getValue())
                     logger.trace("{}: range source {} local range {} for keyspace {}", description, r.remote, r.local, keyspaceName);
@@ -289,10 +360,6 @@ public class RangeStreamer
 
     /**
      * Wrapper method to assemble the arguments for invoking the implementation with RangeStreamer's parameters
-     * @param fetchRanges
-     * @param keyspace
-     * @param useStrictConsistency
-     * @return
      */
     private EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, Keyspace keyspace, boolean useStrictConsistency)
     {
@@ -305,7 +372,7 @@ public class RangeStreamer
         if (tokens != null)
         {
             // Pending ranges
-            tmdAfter =  tmd.cloneOnlyTokenMap();
+            tmdAfter = tmd.cloneOnlyTokenMap();
             tmdAfter.updateNormalTokens(tokens, address);
         }
         else if (useStrictConsistency)
@@ -313,15 +380,14 @@ public class RangeStreamer
             throw new IllegalArgumentException("Can't ask for strict consistency and not supply tokens");
         }
 
-        return RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
-                                                                           strat,
-                                                                           fetchRanges,
-                                                                           useStrictConsistency,
-                                                                           tmd,
-                                                                           tmdAfter,
-                                                                           ALIVE_PREDICATE,
-                                                                           keyspace.getName(),
-                                                                           sourceFilters);
+        return calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
+                                                            strat,
+                                                            fetchRanges,
+                                                            useStrictConsistency,
+                                                            tmd,
+                                                            tmdAfter,
+                                                            keyspace.getName(),
+                                                            sourceFilters);
 
     }
 
@@ -329,7 +395,6 @@ public class RangeStreamer
      * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
      * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
      * consistency.
-     *
      **/
      public static EndpointsByReplica
      calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity,
@@ -338,165 +403,148 @@ public class RangeStreamer
                                                   boolean useStrictConsistency,
                                                   TokenMetadata tmdBefore,
                                                   TokenMetadata tmdAfter,
-                                                  Predicate<Replica> isAlive,
                                                   String keyspace,
-                                                  Collection<Predicate<Replica>> sourceFilters)
-    {
-        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
-
-        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
-        logger.debug ("Keyspace: {}", keyspace);
-        logger.debug("To fetch RN: {}", fetchRanges);
-        logger.debug("Fetch ranges: {}", rangeAddresses);
-
-        Predicate<Replica> testSourceFilters = and(sourceFilters);
-        Function<EndpointsForRange, EndpointsForRange> sorted =
-                endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
-
-        //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
-        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
-        for (Replica toFetch : fetchRanges)
-        {
-            //Replica that is sufficient to provide the data we need
-            //With strict consistency and transient replication we may end up with multiple types
-            //so this isn't used with strict consistency
-            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
-            Predicate<Replica> accept = r ->
-                       isSufficient.test(r)                 // is sufficient
-                    && !r.endpoint().equals(localAddress)   // is not self
-                    && isAlive.test(r);                     // is alive
-
-            logger.debug("To fetch {}", toFetch);
-            for (Range<Token> range : rangeAddresses.keySet())
-            {
-                if (range.contains(toFetch.range()))
-                {
-                    EndpointsForRange oldEndpoints = rangeAddresses.get(range);
-
-                    //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
-                    //It could be multiple endpoints and we must fetch from all of them if they are there
-                    //With transient replication and strict consistency this is to get the full data from a full replica and
-                    //transient data from the transient replica losing data
-                    EndpointsForRange sources;
-                    if (useStrictConsistency)
-                    {
-                        //Start with two sets of who replicates the range before and who replicates it after
-                        EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
-                        logger.debug("Old endpoints {}", oldEndpoints);
-                        logger.debug("New endpoints {}", newEndpoints);
-
-                        //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
-                        //So we need to be careful to only be strict when endpoints == RF
-                        if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
-                        {
-                            Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
-                            // Remove new endpoints from old endpoints based on address
-                            oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
-
-                            if (!all(oldEndpoints, isAlive))
-                                throw new IllegalStateException("A node required to move the data consistently is down: "
-                                                                + oldEndpoints.filter(not(isAlive)));
-
-                            if (oldEndpoints.size() > 1)
-                                throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
-
-                            //If we are transitioning from transient to full and and the set of replicas for the range is not changing
-                            //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
-                            //since we are already a transient replica and the existing replica remains.
-                            //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
-                            //So it's an error if we don't find what we need.
-                            if (oldEndpoints.isEmpty() && toFetch.isTransient())
-                            {
-                                throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
-                            }
-
-                            if (!any(oldEndpoints, isSufficient))
-                            {
-                                // need an additional replica
-                                EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
-                                // include all our filters, to ensure we include a matching node
-                                Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
-                                if (fullReplica.isPresent())
-                                    oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
-                                else
-                                    throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
-                            }
-
-                            //We have to check the source filters here to see if they will remove any replicas
-                            //required for strict consistency
-                            if (!all(oldEndpoints, testSourceFilters))
-                                throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
-                        }
-                        else
-                        {
-                            oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
-                        }
-
-                        //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
-                        sources = oldEndpoints.filter(testSourceFilters);
-                    }
-                    else
-                    {
-                        //Without strict consistency we have given up on correctness so no point in fetching from
-                        //a random full + transient replica since it's also likely to lose data
-                        //Also apply testSourceFilters that were given to us so we can safely select a single source
-                        sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
-                        //Limit it to just the first possible source, we don't need more than one and downstream
-                        //will fetch from every source we supply
-                        sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
-                    }
-
-                    // storing range and preferred endpoint set
-                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
-                    logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
-                }
-            }
-
-            EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
-            if (addressList == null)
-                throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
-
-            /*
-             * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
-             * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
-             * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
-             * For a transient range we only need to fetch from one.
-             */
-            if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
-                throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
-
-            //We must have enough stuff to fetch from
-            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
-            {
-                if (strat.getReplicationFactor().allReplicas == 1)
-                {
-                    if (useStrictConsistency)
-                    {
-                        logger.warn("A node required to move the data consistently is down");
-                        throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
-                                                        "Ensure this keyspace contains replicas in the source datacenter.");
-                    }
-                    else
-                        logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
-                                    "Keyspace might be missing data.", toFetch, keyspace);
-
-                }
-                else
-                {
-                    if (useStrictConsistency)
-                        logger.warn("A node required to move the data consistently is down");
-                    throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
-                }
-            }
-        }
-        return rangesToFetchWithPreferredEndpoints.asImmutableView();
-    }
+                                                  Collection<SourceFilter> sourceFilters)
+     {
+         EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
+
+         InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+         logger.debug ("Keyspace: {}", keyspace);
+         logger.debug("To fetch RN: {}", fetchRanges);
+         logger.debug("Fetch ranges: {}", rangeAddresses);
+
+         Predicate<Replica> testSourceFilters = and(sourceFilters);
+         Function<EndpointsForRange, EndpointsForRange> sorted =
+         endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
+
+         //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
+         EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
+         for (Replica toFetch : fetchRanges)
+         {
+             //Replica that is sufficient to provide the data we need
+             //With strict consistency and transient replication we may end up with multiple types
+             //so this isn't used with strict consistency
+             Predicate<Replica> isSufficient = r -> toFetch.isTransient() || r.isFull();
+
+             logger.debug("To fetch {}", toFetch);
+             for (Range<Token> range : rangeAddresses.keySet())
+             {
+                 if (!range.contains(toFetch.range()))
+                     continue;
+
+                 final EndpointsForRange oldEndpoints = sorted.apply(rangeAddresses.get(range));
+
+                 //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
+                 //It could be multiple endpoints and we must fetch from all of them if they are there
+                 //With transient replication and strict consistency this is to get the full data from a full replica and
+                 //transient data from the transient replica losing data
+                 EndpointsForRange sources;
+                 if (useStrictConsistency)
+                 {
+                     EndpointsForRange strictEndpoints;
+                     //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+                     //So we need to be careful to only be strict when endpoints == RF
+                     if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
+                     {
+                         //Start with two sets of who replicates the range before and who replicates it after
+                         EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
+                         logger.debug("Old endpoints {}", oldEndpoints);
+                         logger.debug("New endpoints {}", newEndpoints);
+
+                         // Remove new endpoints from old endpoints based on address
+                         strictEndpoints = oldEndpoints.without(newEndpoints.endpoints());
+
+                         if (strictEndpoints.size() > 1)
+                             throw new AssertionError("Expected <= 1 endpoint but found " + strictEndpoints);
+
+                         //We have to check the source filters here to see if they will remove any replicas
+                         //required for strict consistency
+                         if (!all(strictEndpoints, testSourceFilters))
+                             throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints));
+
+                         //If we are transitioning from transient to full and and the set of replicas for the range is not changing
+                         //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
+                         //since we are already a transient replica and the existing replica remains.
+                         //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
+                         //So it's an error if we don't find what we need.
+                         if (strictEndpoints.isEmpty() && toFetch.isTransient())
+                             throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
+
+                         if (!any(strictEndpoints, isSufficient))
+                         {
+                             // need an additional replica; include all our filters, to ensure we include a matching node
+                             Optional<Replica> fullReplica = Iterables.<Replica>tryFind(oldEndpoints, and(isSufficient, testSourceFilters)).toJavaUtil();
+                             if (fullReplica.isPresent())
+                                 strictEndpoints = Endpoints.concat(strictEndpoints, EndpointsForRange.of(fullReplica.get()));
+                             else
+                                 throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + buildErrorMessage(sourceFilters, oldEndpoints));
+                         }
+                     }
+                     else
+                     {
+                         strictEndpoints = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
+                     }
+
+                     sources = strictEndpoints;
+                 }
+                 else
+                 {
+                     //Without strict consistency we have given up on correctness so no point in fetching from
+                     //a random full + transient replica since it's also likely to lose data
+                     //Also apply testSourceFilters that were given to us so we can safely select a single source
+                     sources = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
+                     //Limit it to just the first possible source, we don't need more than one and downstream
+                     //will fetch from every source we supply
+                     sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
+                 }
+
+                 // storing range and preferred endpoint set
+                 rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
+                 logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
+             }
+
+             EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
+             if (addressList == null)
+                 throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
+
+             /*
+              * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
+              * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
+              * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
+              * For a transient range we only need to fetch from one.
+              */
+             if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
+                 throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
+
+             //We must have enough stuff to fetch from
+             if (!any(addressList, isSufficient))
+             {
+                 if (strat.getReplicationFactor().allReplicas == 1)
+                 {
+                     if (useStrictConsistency)
+                     {
+                         logger.warn("A node required to move the data consistently is down");
+                         throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
+                                                         "Ensure this keyspace contains replicas in the source datacenter.");
+                     }
+                     else
+                         logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
+                                     "Keyspace might be missing data.", toFetch, keyspace);
+                 }
+                 else
+                 {
+                     if (useStrictConsistency)
+                         logger.warn("A node required to move the data consistently is down");
+                     throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
+                 }
+             }
+         }
+         return rangesToFetchWithPreferredEndpoints.asImmutableView();
+     }
 
     /**
      * The preferred endpoint list is the wrong format because it is keyed by Replica (this node) rather than the source
      * endpoint we will fetch from which streaming wants.
-     * @param preferredEndpoints
-     * @return
      */
     public static Multimap<InetAddressAndPort, FetchReplica> convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints)
     {
@@ -505,7 +553,7 @@ public class RangeStreamer
         {
             for (Replica source : e.getValue())
             {
-                assert (e.getKey()).isLocal();
+                assert e.getKey().isLocal();
                 assert !source.isLocal();
                 workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source));
             }
@@ -518,7 +566,8 @@ public class RangeStreamer
      * Optimized version that also outputs the final work map
      */
     private static Multimap<InetAddressAndPort, FetchReplica> getOptimizedWorkMap(EndpointsByReplica rangesWithSources,
-                                                                                  Collection<Predicate<Replica>> sourceFilters, String keyspace)
+                                                                                  Collection<SourceFilter> sourceFilters,
+                                                                                  String keyspace)
     {
         //For now we just aren't going to use the optimized range fetch map with transient replication to shrink
         //the surface area to test and introduce bugs.
@@ -531,10 +580,11 @@ public class RangeStreamer
             unwrapped.put(entry.getKey().range(), entry.getValue());
         }
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrapped.asImmutableView(), sourceFilters, keyspace);
+        EndpointsByRange unwrappedView = unwrapped.asImmutableView();
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrappedView, sourceFilters, keyspace);
         Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap();
         logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace);
-        validateRangeFetchMap(unwrapped.asImmutableView(), rangeFetchMapMap, keyspace);
+        validateRangeFetchMap(unwrappedView, rangeFetchMapMap, keyspace);
 
         //Need to rewrap as Replicas
         Multimap<InetAddressAndPort, FetchReplica> wrapped = HashMultimap.create();
@@ -562,9 +612,6 @@ public class RangeStreamer
 
     /**
      * Verify that source returned for each range is correct
-     * @param rangesWithSources
-     * @param rangeFetchMapMap
-     * @param keyspace
      */
     private static void validateRangeFetchMap(EndpointsByRange rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace)
     {
@@ -588,7 +635,7 @@ public class RangeStreamer
 
     // For testing purposes
     @VisibleForTesting
-    Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
+    Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
     {
         return toFetch;
     }
@@ -600,16 +647,19 @@ public class RangeStreamer
             sources.asMap().forEach((source, fetchReplicas) -> {
 
                 // filter out already streamed ranges
-                RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
+                SystemKeyspace.AvailableRanges available = stateStore.getAvailableRanges(keyspace, metadata.partitioner);
 
                 Predicate<FetchReplica> isAvailable = fetch -> {
-                    Replica availableRange =  available.byRange().get(fetch.local.range());
-                    if (availableRange == null)
+                    boolean isInFull = available.full.contains(fetch.local.range());
+                    boolean isInTrans = available.trans.contains(fetch.local.range());
+
+                    if (!isInFull && !isInTrans)
                         //Range is unavailable
                         return false;
+
                     if (fetch.local.isFull())
                         //For full, pick only replicas with matching transientness
-                        return availableRange.isFull() == fetch.remote.isFull();
+                        return isInFull == fetch.remote.isFull();
 
                     // Any transient or full will do
                     return true;
@@ -617,22 +667,16 @@ public class RangeStreamer
 
                 List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList());
 
-                if (remaining.size() < available.size())
+                if (remaining.size() < available.full.size() + available.trans.size())
                 {
                     List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
                     logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}",
-                                fetchReplicas, skipped, available.filter(Replica::isFull).ranges(), available.filter(Replica::isTransient).ranges());
+                                fetchReplicas, skipped, available.full, available.trans);
                 }
 
                 if (logger.isTraceEnabled())
                     logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", "));
 
-                //At the other end the distinction between full and transient is ignored it just used the transient status
-                //of the Replica objects we send to determine what to send. The real reason we have this split down to
-                //StreamRequest is that on completion StreamRequest is used to write to the system table tracking
-                //what has already been streamed. At that point since we only have the local Replica instances so we don't
-                //know what we got from the remote. We preserve that here by splitting based on the remotes transient
-                //status.
                 InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
                 RangesAtEndpoint full = remaining.stream()
                         .filter(pair -> pair.remote.isFull())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/StreamStateStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java
index 3144e81..e62bc04 100644
--- a/src/java/org/apache/cassandra/dht/StreamStateStore.java
+++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java
@@ -20,7 +20,8 @@ package org.apache.cassandra.dht;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.cassandra.locator.RangesAtEndpoint;
+import com.google.common.collect.Streams;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +39,7 @@ public class StreamStateStore implements StreamEventHandler
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamStateStore.class);
 
-    public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
+    public SystemKeyspace.AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner)
     {
         return SystemKeyspace.getAvailableRanges(keyspace, partitioner);
     }
@@ -54,8 +55,11 @@ public class StreamStateStore implements StreamEventHandler
     @VisibleForTesting
     public boolean isDataAvailable(String keyspace, Token token)
     {
-        RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
-        return availableRanges.ranges().stream().anyMatch(range -> range.contains(token));
+        SystemKeyspace.AvailableRanges availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
+
+        return Streams.concat(availableRanges.full.stream(),
+                              availableRanges.trans.stream())
+                      .anyMatch(range -> range.contains(token));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 1773173..f57c28e 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
+import static com.google.common.collect.Iterables.all;
 import static org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict.*;
 
 /**
@@ -302,6 +303,11 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint
                 .collect(collector(dummy));
     }
 
+    public static boolean isDummyList(RangesAtEndpoint ranges)
+    {
+        return all(ranges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0"));
+    }
+
     /**
      * @return concatenate two DISJOINT collections together
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/service/RangeRelocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java
new file mode 100644
index 0000000..f2af3db
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/RangeRelocator.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multimap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+@VisibleForTesting
+public class RangeRelocator
+{
+    private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
+
+    private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
+    private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+    private final TokenMetadata tokenMetaCloneAllSettled;
+    // clone to avoid concurrent modification in calculateNaturalReplicas
+    private final TokenMetadata tokenMetaClone;
+    private final Collection<Token> tokens;
+    private final List<String> keyspaceNames;
+
+
+    RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd)
+    {
+        this.tokens = tokens;
+        this.keyspaceNames = keyspaceNames;
+        this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
+        // clone to avoid concurrent modification in calculateNaturalReplicas
+        this.tokenMetaClone = tmd.cloneOnlyTokenMap();
+    }
+
+    @VisibleForTesting
+    public RangeRelocator()
+    {
+        this.tokens = null;
+        this.keyspaceNames = null;
+        this.tokenMetaCloneAllSettled = null;
+        this.tokenMetaClone = null;
+    }
+
+    /**
+     * Wrapper that supplies accessors to the real implementations of the various dependencies for this method
+     */
+    private static Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint fetchRanges,
+                                                                                                                         AbstractReplicationStrategy strategy,
+                                                                                                                         String keyspace,
+                                                                                                                         TokenMetadata tmdBefore,
+                                                                                                                         TokenMetadata tmdAfter)
+    {
+        EndpointsByReplica preferredEndpoints =
+        RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch()::sortedByProximity,
+                                                                   strategy,
+                                                                   fetchRanges,
+                                                                   StorageService.useStrictConsistency,
+                                                                   tmdBefore,
+                                                                   tmdAfter,
+                                                                   keyspace,
+                                                                   Arrays.asList(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance),
+                                                                                 new RangeStreamer.ExcludeLocalNodeFilter()));
+        return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
+    }
+
+    /**
+     * calculating endpoints to stream current ranges to if needed
+     * in some situations node will handle current ranges as part of the new ranges
+     **/
+    public static RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
+                                                                        AbstractReplicationStrategy strat,
+                                                                        TokenMetadata tmdBefore,
+                                                                        TokenMetadata tmdAfter)
+    {
+        RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
+        for (Replica toStream : streamRanges)
+        {
+            //If the range we are sending is full only send it to the new full replica
+            //There will also be a new transient replica we need to send the data to, but not
+            //the repaired data
+            EndpointsForRange oldEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
+            EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
+            logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, oldEndpoints, newEndpoints);
+
+            for (Replica newEndpoint : newEndpoints)
+            {
+                Replica oldEndpoint = oldEndpoints.byEndpoint().get(newEndpoint.endpoint());
+
+                // Nothing to do
+                if (newEndpoint.equals(oldEndpoint))
+                    continue;
+
+                // Completely new range for this endpoint
+                if (oldEndpoint == null)
+                {
+                    if (toStream.isTransient() && newEndpoint.isFull())
+                        throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", newEndpoint, toStream));
+
+                    for (Range<Token> intersection : newEndpoint.range().intersectionWith(toStream.range()))
+                    {
+                        endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(intersection));
+                    }
+                }
+                else
+                {
+                    Set<Range<Token>> subsToStream = Collections.singleton(toStream.range());
+
+                    //First subtract what we already have
+                    if (oldEndpoint.isFull() == newEndpoint.isFull() || oldEndpoint.isFull())
+                        subsToStream = toStream.range().subtract(oldEndpoint.range());
+
+                    //Now we only stream what is still replicated
+                    subsToStream.stream()
+                                .flatMap(range -> range.intersectionWith(newEndpoint.range()).stream())
+                                .forEach(tokenRange -> endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(tokenRange)));
+                }
+            }
+        }
+        return endpointRanges.asImmutableView();
+    }
+
+    public void calculateToFromStreams()
+    {
+        logger.debug("Current tmd: {}, Updated tmd: {}", tokenMetaClone, tokenMetaCloneAllSettled);
+
+        for (String keyspace : keyspaceNames)
+        {
+            // replication strategy of the current keyspace
+            AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
+
+            logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
+            //From what I have seen we only ever call this with a single token from StorageService.move(Token)
+            for (Token newToken : tokens)
+            {
+                Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
+                if (currentTokens.size() > 1 || currentTokens.isEmpty())
+                {
+                    throw new AssertionError("Unexpected current tokens: " + currentTokens);
+                }
+
+                // calculated parts of the ranges to request/stream from/to nodes in the ring
+                Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges;
+
+                //In the single node token move there is nothing to do and Range subtraction is broken
+                //so it's easier to just identify this case up front.
+                if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
+)).size() > 1)
+                {
+                    // getting collection of the currently used ranges by this keyspace
+                    RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
+
+                    // collection of ranges which this node will serve after move to the new token
+                    RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
+
+                    streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
+                }
+                else
+                {
+                     streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress));
+                }
+
+                RangesByEndpoint rangesToStream = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled);
+                logger.info("Endpoint ranges to stream to " + rangesToStream);
+
+                // stream ranges
+                for (InetAddressAndPort address : rangesToStream.keySet())
+                {
+                    logger.debug("Will stream range {} of keyspace {} to endpoint {}", rangesToStream.get(address), keyspace, address);
+                    RangesAtEndpoint ranges = rangesToStream.get(address);
+                    streamPlan.transferRanges(address, keyspace, ranges);
+                }
+
+                Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> rangesToFetch = calculateRangesToFetchWithPreferredEndpoints(streamAndFetchOwnRanges.right, strategy, keyspace, tokenMetaClone, tokenMetaCloneAllSettled);
+
+                // stream requests
+                rangesToFetch.asMap().forEach((address, sourceAndOurReplicas) -> {
+                    RangesAtEndpoint full = sourceAndOurReplicas.stream()
+                            .filter(pair -> pair.remote.isFull())
+                            .map(pair -> pair.local)
+                            .collect(RangesAtEndpoint.collector(localAddress));
+                    RangesAtEndpoint trans = sourceAndOurReplicas.stream()
+                            .filter(pair -> pair.remote.isTransient())
+                            .map(pair -> pair.local)
+                            .collect(RangesAtEndpoint.collector(localAddress));
+                    logger.debug("Will request range {} of keyspace {} from endpoint {}", rangesToFetch.get(address), keyspace, address);
+                    streamPlan.requestRanges(address, keyspace, full, trans);
+                });
+
+                logger.debug("Keyspace {}: work map {}.", keyspace, rangesToFetch);
+            }
+        }
+    }
+
+    /**
+     * Calculate pair of ranges to stream/fetch for given two range collections
+     * (current ranges for keyspace and ranges after move to new token)
+     *
+     * With transient replication the added wrinkle is that if a range transitions from full to transient then
+     * we need to stream the range despite the fact that we are retaining it as transient. Some replica
+     * somewhere needs to transition from transient to full and we will be the source.
+     *
+     * If the range is transient and is transitioning to full then always fetch even if the range was already transient
+     * since a transiently replicated obviously needs to fetch data to become full.
+     *
+     * This why there is a continue after checking for instersection because intersection is not sufficient reason
+     * to do the subtraction since we might need to stream/fetch data anyways.
+     *
+     * @param currentRanges collection of the ranges by current token
+     * @param updatedRanges collection of the ranges after token is changed
+     * @return pair of ranges to stream/fetch for given current and updated range collections
+     */
+    public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint currentRanges, RangesAtEndpoint updatedRanges)
+    {
+        RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(currentRanges.endpoint());
+        RangesAtEndpoint.Builder toFetch  = RangesAtEndpoint.builder(currentRanges.endpoint());
+        logger.debug("Calculating toStream");
+        computeRanges(currentRanges, updatedRanges, toStream);
+
+        logger.debug("Calculating toFetch");
+        computeRanges(updatedRanges, currentRanges, toFetch);
+
+        logger.debug("To stream {}", toStream);
+        logger.debug("To fetch {}", toFetch);
+        return Pair.create(toStream.build(), toFetch.build());
+    }
+
+    private static void computeRanges(RangesAtEndpoint srcRanges, RangesAtEndpoint dstRanges, RangesAtEndpoint.Builder ranges)
+    {
+        for (Replica src : srcRanges)
+        {
+            boolean intersect = false;
+            RangesAtEndpoint remainder = null;
+            for (Replica dst : dstRanges)
+            {
+                logger.debug("Comparing {} and {}", src, dst);
+                // Stream the full range if there's no intersection
+                if (!src.intersectsOnRange(dst))
+                    continue;
+
+                // If we're transitioning from full to transient
+                if (src.isFull() && dst.isTransient())
+                    continue;
+
+                if (remainder == null)
+                {
+                    remainder = src.subtractIgnoreTransientStatus(dst.range());
+                }
+                else
+                {
+                    // Re-subtract ranges to avoid overstreaming in cases when the single range is split or merged
+                    RangesAtEndpoint.Builder newRemainder = new RangesAtEndpoint.Builder(remainder.endpoint());
+                    for (Replica replica : remainder)
+                        newRemainder.addAll(replica.subtractIgnoreTransientStatus(dst.range()));
+                    remainder = newRemainder.build();
+                }
+                intersect = true;
+            }
+
+            if (!intersect)
+            {
+                assert remainder == null;
+                logger.debug("    Doesn't intersect adding {}", src);
+                ranges.add(src); // should stream whole old range
+            }
+            else
+            {
+                ranges.addAll(remainder);
+                logger.debug("    Intersects adding {}", remainder);
+            }
+        }
+    }
+
+    public Future<StreamState> stream()
+    {
+        return streamPlan.execute();
+    }
+
+    public boolean streamsNeeded()
+    {
+        return !streamPlan.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a979f1c..391598c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -236,7 +236,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private Collection<Token> bootstrapTokens = null;
 
     // true when keeping strict consistency while bootstrapping
-    private static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
+    public static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
     private static final boolean allowSimultaneousMoves = Boolean.parseBoolean(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false"));
     private static final boolean joinRing = Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"));
     private boolean replacing;
@@ -1227,7 +1227,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                                        streamStateStore,
                                                        false,
                                                        DatabaseDescriptor.getStreamingConnectionsPerHost());
-            streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
             if (sourceDc != null)
                 streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
 
@@ -4316,208 +4315,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
     }
 
-    @VisibleForTesting
-    public static class RangeRelocator
-    {
-        private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
-        private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
-        private final TokenMetadata tokenMetaCloneAllSettled;
-        // clone to avoid concurrent modification in calculateNaturalReplicas
-        private final TokenMetadata tokenMetaClone;
-        private final Collection<Token> tokens;
-        private final List<String> keyspaceNames;
-
-
-        private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd)
-        {
-            this.tokens = tokens;
-            this.keyspaceNames = keyspaceNames;
-            this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
-            // clone to avoid concurrent modification in calculateNaturalReplicas
-            this.tokenMetaClone = tmd.cloneOnlyTokenMap();
-        }
-
-        @VisibleForTesting
-        public RangeRelocator()
-        {
-            this.tokens = null;
-            this.keyspaceNames = null;
-            this.tokenMetaCloneAllSettled = null;
-            this.tokenMetaClone = null;
-        }
-
-        /**
-         * Wrapper that supplies accessors to the real implementations of the various dependencies for this method
-         */
-        private Multimap<InetAddressAndPort, FetchReplica> calculateRangesToFetchWithPreferredEndpoints(AbstractReplicationStrategy strategy, RangesAtEndpoint fetchRanges, String keyspace)
-        {
-            EndpointsByReplica preferredEndpoints =
-            RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> DatabaseDescriptor.getEndpointSnitch().sortedByProximity(address, replicas),
-                                                                       strategy,
-                                                                       fetchRanges,
-                                                                       useStrictConsistency,
-                                                                       tokenMetaClone,
-                                                                       tokenMetaCloneAllSettled,
-                                                                       RangeStreamer.ALIVE_PREDICATE,
-                                                                       keyspace,
-                                                                       Collections.emptyList());
-            return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
-        }
-
-        /**
-         * calculating endpoints to stream current ranges to if needed
-         * in some situations node will handle current ranges as part of the new ranges
-         **/
-        public RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
-                                                                     AbstractReplicationStrategy strat,
-                                                                     TokenMetadata tmdBefore,
-                                                                     TokenMetadata tmdAfter)
-        {
-            RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
-            for (Replica toStream : streamRanges)
-            {
-                //If the range we are sending is full only send it to the new full replica
-                //There will also be a new transient replica we need to send the data to, but not
-                //the repaired data
-                EndpointsForRange currentEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
-                EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
-                logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, currentEndpoints, newEndpoints);
-
-                for (Replica current : currentEndpoints)
-                {
-                    for (Replica updated : newEndpoints)
-                    {
-                        if (current.endpoint().equals(updated.endpoint()))
-                        {
-                            //Nothing to do
-                            if (current.equals(updated))
-                                break;
-
-                            //In these two (really three) cases the existing data is sufficient and we should subtract whatever is already replicated
-                            if (current.isFull() == updated.isFull() || current.isFull())
-                            {
-                                //First subtract what we already have
-                                Set<Range<Token>> subsToStream = toStream.range().subtract(current.range());
-                                //Now we only stream what is still replicated
-                                subsToStream = subsToStream.stream().flatMap(range -> range.intersectionWith(updated.range()).stream()).collect(Collectors.toSet());
-                                for (Range<Token> subrange : subsToStream)
-                                {
-                                    //Only stream what intersects with what is in the new world
-                                    Set<Range<Token>> intersections = subrange.intersectionWith(updated.range());
-                                    for (Range<Token> intersection : intersections)
-                                    {
-                                        endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
-                                    }
-                                }
-                            }
-                            else
-                            {
-                                for (Range<Token> intersection : toStream.range().intersectionWith(updated.range()))
-                                {
-                                    endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
-                                }
-                            }
-                        }
-                    }
-                }
-
-                for (Replica updated : newEndpoints)
-                {
-                    if (!currentEndpoints.byEndpoint().containsKey(updated.endpoint()))
-                    {
-                        // Completely new range for this endpoint
-                        if (toStream.isTransient() && updated.isFull())
-                        {
-                            throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", updated, toStream));
-                        }
-                        for (Range<Token> intersection : updated.range().intersectionWith(toStream.range()))
-                        {
-                            endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection));
-                        }
-                    }
-                }
-            }
-            return endpointRanges.asImmutableView();
-        }
-
-        private void calculateToFromStreams()
-        {
-            logger.debug("Current tmd " + tokenMetaClone);
-            logger.debug("Updated tmd " + tokenMetaCloneAllSettled);
-            for (String keyspace : keyspaceNames)
-            {
-                // replication strategy of the current keyspace
-                AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
-                // getting collection of the currently used ranges by this keyspace
-                RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
-
-                logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
-                //From what I have seen we only ever call this with a single token from StorageService.move(Token)
-                for (Token newToken : tokens)
-                {
-                    Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
-                    if (currentTokens.size() > 1 || currentTokens.isEmpty())
-                    {
-                        throw new AssertionError("Unexpected current tokens: " + currentTokens);
-                    }
-
-                    // collection of ranges which this node will serve after move to the new token
-                    RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
-
-                    // calculated parts of the ranges to request/stream from/to nodes in the ring
-                    Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress));
-                    //In the single node token move there is nothing to do and Range subtraction is broken
-                    //so it's easier to just identify this case up front.
-                    if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
-)).size() > 1)
-                    {
-                        streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
-                    }
-
-                    Multimap<InetAddressAndPort, FetchReplica> workMap = calculateRangesToFetchWithPreferredEndpoints(strategy, streamAndFetchOwnRanges.right, keyspace);
-
-                    RangesByEndpoint endpointRanges = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled);
-
-                    logger.info("Endpoint ranges to stream to " + endpointRanges);
-
-                    // stream ranges
-                    for (InetAddressAndPort address : endpointRanges.keySet())
-                    {
-                        logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address);
-                        RangesAtEndpoint ranges = endpointRanges.get(address);
-                        streamPlan.transferRanges(address, keyspace, ranges);
-                    }
-
-                    // stream requests
-                    workMap.asMap().forEach((address, sourceAndOurReplicas) -> {
-                        RangesAtEndpoint full = sourceAndOurReplicas.stream()
-                                .filter(pair -> pair.remote.isFull())
-                                .map(pair -> pair.local)
-                                .collect(RangesAtEndpoint.collector(localAddress));
-                        RangesAtEndpoint transientReplicas = sourceAndOurReplicas.stream()
-                                .filter(pair -> pair.remote.isTransient())
-                                .map(pair -> pair.local)
-                                .collect(RangesAtEndpoint.collector(localAddress));
-                        logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address);
-                        streamPlan.requestRanges(address, keyspace, full, transientReplicas);
-                    });
-
-                    logger.debug("Keyspace {}: work map {}.", keyspace, workMap);
-                }
-            }
-        }
-
-        public Future<StreamState> stream()
-        {
-            return streamPlan.execute();
-        }
-
-        public boolean streamsNeeded()
-        {
-            return !streamPlan.isEmpty();
-        }
-    }
-
     public String getRemovalStatus()
     {
         return getRemovalStatus(false);
@@ -5271,115 +5068,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return streamPlan.execute();
     }
 
-    /**
-     * Calculate pair of ranges to stream/fetch for given two range collections
-     * (current ranges for keyspace and ranges after move to new token)
-     *
-     * With transient replication the added wrinkle is that if a range transitions from full to transient then
-     * we need to stream the range despite the fact that we are retaining it as transient. Some replica
-     * somewhere needs to transition from transient to full and we wll be the source.
-     *
-     * If the range is transient and is transitioning to full then always fetch even if the range was already transient
-     * since a transiently replicated obviously needs to fetch data to become full.
-     *
-     * This why there is a continue after checking for instersection because intersection is not sufficient reason
-     * to do the subtraction since we might need to stream/fetch data anyways.
-     *
-     * @param current collection of the ranges by current token
-     * @param updated collection of the ranges after token is changed
-     * @return pair of ranges to stream/fetch for given current and updated range collections
-     */
-    public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint current, RangesAtEndpoint updated)
-    {
-        // FIXME: transient replication
-        // this should always be the local node, except for tests TODO: assert this
-        RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(current.endpoint());
-        RangesAtEndpoint.Builder toFetch  = RangesAtEndpoint.builder(current.endpoint());
-
-        logger.debug("Calculating toStream");
-        for (Replica r1 : current)
-        {
-            boolean intersect = false;
-            RangesAtEndpoint.Mutable remainder = null;
-            for (Replica r2 : updated)
-            {
-                logger.debug("Comparing {} and {}", r1, r2);
-                //If we will end up transiently replicating send the entire thing and don't subtract
-                if (r1.intersectsOnRange(r2) && !(r1.isFull() && r2.isTransient()))
-                {
-                    RangesAtEndpoint.Mutable oldRemainder = remainder;
-                    remainder = new RangesAtEndpoint.Mutable(current.endpoint());
-                    if (oldRemainder != null)
-                    {
-                        for (Replica replica : oldRemainder)
-                        {
-                            remainder.addAll(replica.subtractIgnoreTransientStatus(r2.range()));
-                        }
-                    }
-                    else
-                    {
-                        remainder.addAll(r1.subtractIgnoreTransientStatus(r2.range()));
-                    }
-                    logger.debug("    Intersects adding {}", remainder);
-                    intersect = true;
-                }
-            }
-            if (!intersect)
-            {
-                logger.debug("    Doesn't intersect adding {}", r1);
-                toStream.add(r1); // should stream whole old range
-            }
-            else
-            {
-                toStream.addAll(remainder);
-            }
-        }
-
-        logger.debug("Calculating toFetch");
-        for (Replica r2 : updated)
-        {
-            boolean intersect = false;
-            RangesAtEndpoint.Mutable remainder = null;
-            for (Replica r1 : current)
-            {
-                logger.info("Comparing {} and {}", r2, r1);
-                //Transitioning from transient to full means fetch everything so intersection doesn't matter.
-                if (r2.intersectsOnRange(r1) && !(r1.isTransient() && r2.isFull()))
-                {
-                    RangesAtEndpoint.Mutable oldRemainder = remainder;
-                    remainder = new RangesAtEndpoint.Mutable(current.endpoint());
-                    if (oldRemainder != null)
-                    {
-                        for (Replica replica : oldRemainder)
-                        {
-                            remainder.addAll(replica.subtractIgnoreTransientStatus(r1.range()));
-                        }
-                    }
-                    else
-                    {
-                        remainder.addAll(r2.subtractIgnoreTransientStatus(r1.range()));
-                    }
-                    logger.debug("    Intersects adding {}", remainder);
-                    intersect = true;
-                }
-            }
-            if (!intersect)
-            {
-                logger.debug("    Doesn't intersect adding {}", r2);
-                toFetch.add(r2); // should fetch whole old range
-            }
-            else
-            {
-                toFetch.addAll(remainder);
-            }
-        }
-
-        logger.debug("To stream {}", toStream);
-        logger.debug("To fetch {}", toFetch);
-
-        return Pair.create(toStream.build(), toFetch.build());
-    }
-
     public void bulkLoad(String directory)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 2f6deb5..ea54f9d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -70,6 +70,16 @@ public class StreamPlan
     /**
      * Request data in {@code keyspace} and {@code ranges} from specific node.
      *
+     * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
+     * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
+     *
+     * At the other end the distinction between full and transient is ignored it just used the transient status
+     * of the Replica objects we send to determine what to send. The real reason we have this split down to
+     * StreamRequest is that on completion StreamRequest is used to write to the system table tracking
+     * what has already been streamed. At that point since we only have the local Replica instances so we don't
+     * know what we got from the remote. We preserve that here by splitting based on the remotes transient
+     * status.
+     * 
      * @param from endpoint address to fetch data from.
      * @param keyspace name of keyspace
      * @param fullRanges ranges to fetch that from provides the full version of
@@ -94,10 +104,9 @@ public class StreamPlan
     public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies)
     {
         //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
-        assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
-             fullRanges.toString();
-        assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
-        transientRanges.toString();
+        assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+        assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+
         StreamSession session = coordinator.getOrCreateNextSession(from);
         session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies));
         return this;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index ec80772..d7d0836 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -300,6 +300,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     /**
      * Request data fetch task to this session.
      *
+     * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
+     * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
+     *
      * @param keyspace Requesting keyspace
      * @param fullRanges Ranges to retrieve data that will return full data from the source
      * @param transientRanges Ranges to retrieve data that will return transient data from the source
@@ -308,8 +311,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies)
     {
         //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
-        assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : fullRanges.toString();
-        assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : transientRanges.toString();
+        assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
+        assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
+
         requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org