You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:57 UTC

[05/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 447d504..374a760 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db.repair;
 
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -42,6 +43,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
@@ -64,6 +67,9 @@ public class PendingAntiCompactionTest
 {
     private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
     private static final Collection<Range<Token>> FULL_RANGE;
+    private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();
+    private static InetAddressAndPort local;
+
     static
     {
         DatabaseDescriptor.daemonInitialization();
@@ -77,9 +83,10 @@ public class PendingAntiCompactionTest
     private ColumnFamilyStore cfs;
 
     @BeforeClass
-    public static void setupClass()
+    public static void setupClass() throws Throwable
     {
         SchemaLoader.prepareServer();
+        local = InetAddressAndPort.getByName("127.0.0.1");
     }
 
     @Before
@@ -89,6 +96,7 @@ public class PendingAntiCompactionTest
         cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
         SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
         cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+
     }
 
     private void makeSSTables(int num)
@@ -105,7 +113,7 @@ public class PendingAntiCompactionTest
 
     private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback
     {
-        public InstrumentedAcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges)
+        public InstrumentedAcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint ranges)
         {
             super(parentRepairSession, ranges);
         }
@@ -155,7 +163,7 @@ public class PendingAntiCompactionTest
         ExecutorService executor = Executors.newSingleThreadExecutor();
         try
         {
-            pac = new PendingAntiCompaction(sessionID, tables, ranges, executor);
+            pac = new PendingAntiCompaction(sessionID, tables, atEndpoint(ranges, NO_RANGES), executor);
             pac.run().get();
         }
         finally
@@ -217,7 +225,7 @@ public class PendingAntiCompactionTest
         Assert.assertTrue(repaired.intersects(FULL_RANGE));
         Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
 
-        repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 1, null);
+        repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false);
         repaired.reloadSSTableMetadata();
 
         PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
@@ -243,7 +251,7 @@ public class PendingAntiCompactionTest
         Assert.assertTrue(repaired.intersects(FULL_RANGE));
         Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
 
-        repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 0, UUIDGen.getTimeUUID());
+        repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, UUIDGen.getTimeUUID(), false);
         repaired.reloadSSTableMetadata();
         Assert.assertTrue(repaired.isPendingRepair());
 
@@ -284,7 +292,7 @@ public class PendingAntiCompactionTest
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
         Assert.assertNotNull(result);
 
-        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
         Assert.assertTrue(cb.submittedCompactions.isEmpty());
         cb.apply(Lists.newArrayList(result));
 
@@ -308,7 +316,7 @@ public class PendingAntiCompactionTest
         Assert.assertNotNull(result);
         Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
 
-        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList()));
         Assert.assertTrue(cb.submittedCompactions.isEmpty());
         cb.apply(Lists.newArrayList(result, null));
 
@@ -333,7 +341,7 @@ public class PendingAntiCompactionTest
         ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id);
         PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null);
 
-        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
         Assert.assertTrue(cb.submittedCompactions.isEmpty());
         cb.apply(Lists.newArrayList(result, fakeResult));
 
@@ -359,8 +367,19 @@ public class PendingAntiCompactionTest
                                                                  true,0,
                                                                  true,
                                                                  PreviewKind.NONE);
-        CompactionManager.instance.performAnticompaction(result.cfs, FULL_RANGE, result.refs, result.txn,
-                                                         ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID);
+        CompactionManager.instance.performAnticompaction(result.cfs, atEndpoint(FULL_RANGE, NO_RANGES), result.refs, result.txn, sessionID);
+
+    }
+
+    private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
+    {
+        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
+        for (Range<Token> range : full)
+            builder.add(new Replica(local, range, true));
+
+        for (Range<Token> range : trans)
+            builder.add(new Replica(local, range, false));
 
+        return builder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
index 8256ac6..5e44346 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
@@ -114,6 +114,7 @@ public class CassandraOutgoingFileTest
         List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(4)),
                                                          new Range<>(getTokenAtIndex(2), getTokenAtIndex(6)),
                                                          new Range<>(getTokenAtIndex(5), sstable.last.getToken()));
+        requestedRanges = Range.normalize(requestedRanges);
 
         CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
                                                               sstable.getPositionsForRanges(requestedRanges),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index 86018af..b597bfe 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -33,6 +33,8 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -118,10 +120,10 @@ public class CassandraStreamManagerTest
         return Iterables.getOnlyElement(diff);
     }
 
-    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
     {
         Descriptor descriptor = sstable.descriptor;
-        descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair);
+        descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, repairedAt, pendingRepair, isTransient);
         sstable.reloadSSTableMetadata();
 
     }
@@ -141,7 +143,7 @@ public class CassandraStreamManagerTest
     private Set<SSTableReader> getReadersForRange(Range<Token> range)
     {
         Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(NO_PENDING_REPAIR),
-                                                                                          Collections.singleton(range),
+                                                                                          RangesAtEndpoint.toDummyList(Collections.singleton(range)),
                                                                                           NO_PENDING_REPAIR,
                                                                                           PreviewKind.NONE);
         return sstablesFromStreams(streams);
@@ -151,7 +153,7 @@ public class CassandraStreamManagerTest
     {
         IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
         Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
-        Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), ranges, pendingRepair, PreviewKind.NONE);
+        Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), RangesAtEndpoint.toDummyList(ranges), pendingRepair, PreviewKind.NONE);
         return sstablesFromStreams(streams);
     }
 
@@ -167,9 +169,9 @@ public class CassandraStreamManagerTest
 
         UUID pendingRepair = UUIDGen.getTimeUUID();
         long repairedAt = System.currentTimeMillis();
-        mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
-        mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
-        mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR);
+        mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, false);
+        mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false);
+        mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR, false);
 
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java b/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java
new file mode 100644
index 0000000..2a6cb65
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamRequest;
+
+public class StreamRequestTest
+{
+    private static InetAddressAndPort local;
+    private final String ks = "keyspace";
+    private final int version = MessagingService.current_version;
+
+    @BeforeClass
+    public static void setUp() throws Throwable
+    {
+        DatabaseDescriptor.daemonInitialization();
+        local = InetAddressAndPort.getByName("127.0.0.1");
+    }
+
+    @Test
+    public void serializationRoundTrip() throws Throwable
+    {
+        StreamRequest orig = new StreamRequest(ks,
+                                               atEndpoint(Arrays.asList(range(1, 2), range(3, 4), range(5, 6)),
+                                                          Collections.emptyList()),
+                                               atEndpoint(Collections.emptyList(),
+                                                          Arrays.asList(range(5, 6), range(7, 8))),
+                                               Arrays.asList("a", "b", "c"));
+
+        int expectedSize = (int) StreamRequest.serializer.serializedSize(orig, version);
+        try (DataOutputBuffer out = new DataOutputBuffer(expectedSize))
+        {
+            StreamRequest.serializer.serialize(orig, out, version);
+            Assert.assertEquals(expectedSize, out.buffer().limit());
+            try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
+            {
+                StreamRequest decoded = StreamRequest.serializer.deserialize(in, version);
+
+                Assert.assertEquals(orig.keyspace, decoded.keyspace);
+                Assert.assertEquals(orig.full, decoded.full);
+                Assert.assertEquals(orig.transientReplicas, decoded.transientReplicas);
+                Assert.assertEquals(orig.columnFamilies, decoded.columnFamilies);
+            }
+        }
+    }
+
+    private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
+    {
+        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
+        for (Range<Token> range : full)
+            builder.add(new Replica(local, range, true));
+
+        for (Range<Token> range : trans)
+            builder.add(new Replica(local, range, false));
+
+        return builder.build();
+    }
+
+    private static Range<Token> range(int l, int r)
+    {
+        return new Range<>(new ByteOrderedPartitioner.BytesToken(Integer.toString(l).getBytes()),
+                           new ByteOrderedPartitioner.BytesToken(Integer.toString(r).getBytes()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index ec0f6b1..7eebef7 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import org.junit.Assert;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
@@ -76,12 +77,12 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
-                                                                                        new StringToken("CA"),
-                                                                                        new StringToken("BB"));
+        Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+                                                                             new StringToken("CA"),
+                                                                             new StringToken("BB"));
 
         Assert.assertTrue(naturalEndpoint.isPresent());
-        Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get());
+        Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get().endpoint());
     }
 
 
@@ -109,12 +110,12 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
-                                                                                        new StringToken("CA"),
-                                                                                        new StringToken("BB"));
+        Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+                                                                             new StringToken("CA"),
+                                                                             new StringToken("BB"));
 
         Assert.assertTrue(naturalEndpoint.isPresent());
-        Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get());
+        Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get().endpoint());
     }
 
     @Test
@@ -141,9 +142,9 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
-                                                                                        new StringToken("AB"),
-                                                                                        new StringToken("BB"));
+        Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+                                                                             new StringToken("AB"),
+                                                                             new StringToken("BB"));
 
         Assert.assertFalse(naturalEndpoint.isPresent());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index f11cb62..8ae6853 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -18,18 +18,20 @@
 package org.apache.cassandra.dht;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import java.net.UnknownHostException;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 
+import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
 import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 
 import org.junit.AfterClass;
@@ -41,6 +43,7 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.tokenallocator.TokenAllocation;
@@ -60,6 +63,7 @@ public class BootStrapperTest
 {
     static IPartitioner oldPartitioner;
 
+    static Predicate<Replica> originalAlivePredicate = RangeStreamer.ALIVE_PREDICATE;
     @BeforeClass
     public static void setup() throws ConfigurationException
     {
@@ -68,12 +72,14 @@ public class BootStrapperTest
         SchemaLoader.startGossiper();
         SchemaLoader.prepareServer();
         SchemaLoader.schemaDefinition("BootStrapperTest");
+        RangeStreamer.ALIVE_PREDICATE = Predicates.alwaysTrue();
     }
 
     @AfterClass
     public static void tearDown()
     {
         DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner);
+        RangeStreamer.ALIVE_PREDICATE = originalAlivePredicate;
     }
 
     @Test
@@ -82,7 +88,7 @@ public class BootStrapperTest
         final int[] clusterSizes = new int[] { 1, 3, 5, 10, 100};
         for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
         {
-            int replicationFactor = Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor();
+            int replicationFactor = Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor().allReplicas;
             for (int clusterSize : clusterSizes)
                 if (clusterSize >= replicationFactor)
                     testSourceTargetComputation(keyspaceName, clusterSize, replicationFactor);
@@ -115,21 +121,25 @@ public class BootStrapperTest
             public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
         };
         s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
+        assertNotNull(Keyspace.open(keyspaceName));
         s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
 
-        Collection<Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName);
 
-        // Check we get get RF new ranges in total
-        Set<Range<Token>> ranges = new HashSet<>();
-        for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> e : toFetch)
-            ranges.addAll(e.getValue());
+        Collection<Multimap<InetAddressAndPort, FetchReplica>> toFetch = s.toFetch().get(keyspaceName);
 
-        assertEquals(replicationFactor, ranges.size());
+        // Check we get get RF new ranges in total
+        long rangesCount = toFetch.stream()
+               .map(Multimap::values)
+               .flatMap(Collection::stream)
+               .map(f -> f.remote)
+               .map(Replica::range)
+               .count();
+        assertEquals(replicationFactor, rangesCount);
 
         // there isn't any point in testing the size of these collections for any specific size.  When a random partitioner
         // is used, they will vary.
-        assert toFetch.iterator().next().getValue().size() > 0;
-        assert !toFetch.iterator().next().getKey().equals(myEndpoint);
+        assert toFetch.stream().map(Multimap::values).flatMap(Collection::stream).count() > 0;
+        assert toFetch.stream().map(Multimap::keySet).map(Collection::stream).noneMatch(myEndpoint::equals);
         return s;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
index 78e87c1..07d6377 100644
--- a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
@@ -25,8 +25,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
-import com.google.common.collect.HashMultimap;
+import com.google.common.base.Predicate;
 import com.google.common.collect.Multimap;
+import org.apache.cassandra.locator.EndpointsByRange;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -34,6 +35,8 @@ import org.junit.Test;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaUtils;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -78,14 +81,14 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testWithSingleSource() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
         addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.4");
         addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.5");
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
         validateRange(rangesWithSources, map);
 
@@ -95,14 +98,14 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testWithNonOverlappingSource() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.4");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.5", "127.0.0.6");
         addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.7", "127.0.0.8");
         addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.9", "127.0.0.10");
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
         validateRange(rangesWithSources, map);
 
@@ -112,12 +115,12 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testWithRFThreeReplacement() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2", "127.0.0.3");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3", "127.0.0.4");
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
         validateRange(rangesWithSources, map);
 
@@ -128,14 +131,14 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testForMultipleRoundsComputation() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
         addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.3");
         addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.3", "127.0.0.2");
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
         validateRange(rangesWithSources, map);
 
@@ -150,14 +153,14 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testForMultipleRoundsComputationWithLocalHost() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1", "127.0.0.2");
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
         validateRange(rangesWithSources, map);
 
@@ -170,14 +173,14 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testForEmptyGraph() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
         addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1");
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
         //All ranges map to local host so we will not stream anything.
         assertTrue(map.isEmpty());
@@ -186,31 +189,28 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testWithNoSourceWithLocal() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.5");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
 
         //Return false for all except 127.0.0.5
-        final RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter()
+        final Predicate<Replica> filter = replica ->
         {
-            public boolean shouldInclude(InetAddressAndPort endpoint)
+            try
             {
-                try
-                {
-                    if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.5")))
-                        return false;
-                    else
-                        return true;
-                }
-                catch (UnknownHostException e)
-                {
+                if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5")))
+                    return false;
+                else
                     return true;
-                }
+            }
+            catch (UnknownHostException e)
+            {
+                return true;
             }
         };
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(filter), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(filter), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
 
         validateRange(rangesWithSources, map);
@@ -225,32 +225,26 @@ public class RangeFetchMapCalculatorTest
     @Test (expected = IllegalStateException.class)
     public void testWithNoLiveSource() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.5");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
 
-        final RangeStreamer.ISourceFilter allDeadFilter = new RangeStreamer.ISourceFilter()
-        {
-            public boolean shouldInclude(InetAddressAndPort endpoint)
-            {
-                return false;
-            }
-        };
+        final Predicate<Replica> allDeadFilter = replica -> false;
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(allDeadFilter), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(allDeadFilter), "Test");
         calculator.getRangeFetchMap();
     }
 
     @Test
     public void testForLocalDC() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.3", "127.0.0.53");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1", "127.0.0.3", "127.0.0.57");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59", "127.0.0.61");
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<>(), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), new ArrayList<>(), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
         validateRange(rangesWithSources, map);
         Assert.assertEquals(2, map.asMap().size());
@@ -263,31 +257,28 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testForRemoteDC() throws Exception
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
 
         //Reject only 127.0.0.3 and accept everyone else
-        final RangeStreamer.ISourceFilter localHostFilter = new RangeStreamer.ISourceFilter()
+        final Predicate<Replica> localHostFilter = replica ->
         {
-            public boolean shouldInclude(InetAddressAndPort endpoint)
+            try
             {
-                try
-                {
-                    if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.3")))
-                        return false;
-                    else
-                        return true;
-                }
-                catch (UnknownHostException e)
-                {
+                if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
+                    return false;
+                else
                     return true;
-                }
+            }
+            catch (UnknownHostException e)
+            {
+                return true;
             }
         };
 
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(localHostFilter), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(localHostFilter), "Test");
         Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
         validateRange(rangesWithSources, map);
         Assert.assertEquals(3, map.asMap().size());
@@ -301,14 +292,14 @@ public class RangeFetchMapCalculatorTest
     @Test
     public void testTrivialRanges() throws UnknownHostException
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         // add non-trivial ranges
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
         addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
         // and a trivial one:
         addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Collections.emptyList(), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
         Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
         Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap);
         assertTrue(trivialMap.get(InetAddressAndPort.getByName("127.0.0.3")).contains(generateTrivialRange(1,10)) ^
@@ -319,7 +310,7 @@ public class RangeFetchMapCalculatorTest
     @Test(expected = IllegalStateException.class)
     public void testNotEnoughEndpointsForTrivialRange() throws UnknownHostException
     {
-        Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+        EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
         // add non-trivial ranges
         addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
         addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
@@ -327,23 +318,20 @@ public class RangeFetchMapCalculatorTest
         // and a trivial one:
         addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
 
-        RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter()
+        Predicate<Replica> filter = replica ->
         {
-            public boolean shouldInclude(InetAddressAndPort endpoint)
+            try
             {
-                try
-                {
-                    if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.3")))
-                        return false;
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
-                return true;
+                if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
+                    return false;
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
             }
+            return true;
         };
-        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Collections.singleton(filter), "Test");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.singleton(filter), "Test");
         Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
         Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap);
 
@@ -355,27 +343,29 @@ public class RangeFetchMapCalculatorTest
         assertTrue(result.containsAll(expected));
     }
 
-    private void validateRange(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result)
+    private void validateRange(EndpointsByRange.Mutable rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result)
     {
         for (Map.Entry<InetAddressAndPort, Range<Token>> entry : result.entries())
         {
-            assertTrue(rangesWithSources.get(entry.getValue()).contains(entry.getKey()));
+            assertTrue(rangesWithSources.get(entry.getValue()).endpoints().contains(entry.getKey()));
         }
     }
 
-    private void addNonTrivialRangeAndSources(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
+    private void addNonTrivialRangeAndSources(EndpointsByRange.Mutable rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
     {
         for (InetAddressAndPort endpoint : makeAddrs(hosts))
         {
-            rangesWithSources.put(generateNonTrivialRange(left, right), endpoint);
+            Range<Token> range = generateNonTrivialRange(left, right);
+            rangesWithSources.put(range, Replica.fullReplica(endpoint, range));
         }
     }
 
-    private void addTrivialRangeAndSources(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
+    private void addTrivialRangeAndSources(EndpointsByRange.Mutable rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
     {
         for (InetAddressAndPort endpoint : makeAddrs(hosts))
         {
-            rangesWithSources.put(generateTrivialRange(left, right), endpoint);
+            Range<Token> range = generateTrivialRange(left, right);
+            rangesWithSources.put(range, Replica.fullReplica(endpoint, range));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 495979e..36a8da1 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -642,7 +642,7 @@ public class RangeTest
             Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
             for (Token t : tokensToTest)
             {
-                if (checker.contains(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration
+                if (checker.test(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration
                     fail(String.format("This should never flap! If it does, it is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t));
             }
         }
@@ -653,11 +653,11 @@ public class RangeTest
     {
         List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE));
         Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
-        assertFalse(checker.contains(t(Long.MIN_VALUE)));
-        assertTrue(checker.contains(t(Long.MIN_VALUE + 1)));
-        assertFalse(checker.contains(t(0)));
-        assertFalse(checker.contains(t(Long.MAX_VALUE - 1)));
-        assertTrue(checker.contains(t(Long.MAX_VALUE)));
+        assertFalse(checker.test(t(Long.MIN_VALUE)));
+        assertTrue(checker.test(t(Long.MIN_VALUE + 1)));
+        assertFalse(checker.test(t(0)));
+        assertFalse(checker.test(t(Long.MAX_VALUE - 1)));
+        assertTrue(checker.test(t(Long.MAX_VALUE)));
     }
 
     private static Range<Token> r(long left, long right)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/SplitterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java
index 322a57c..c591499 100644
--- a/test/unit/org/apache/cassandra/dht/SplitterTest.java
+++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java
@@ -62,13 +62,54 @@ public class SplitterTest
         randomSplitTestVNodes(new Murmur3Partitioner());
     }
 
+    @Test
+    public void testWithWeight()
+    {
+        List<Splitter.WeightedRange> ranges = new ArrayList<>();
+        ranges.add(new Splitter.WeightedRange(1.0, t(0, 10)));
+        ranges.add(new Splitter.WeightedRange(1.0, t(20, 30)));
+        ranges.add(new Splitter.WeightedRange(0.5, t(40, 60)));
+
+        List<Splitter.WeightedRange> ranges2 = new ArrayList<>();
+        ranges2.add(new Splitter.WeightedRange(1.0, t(0, 10)));
+        ranges2.add(new Splitter.WeightedRange(1.0, t(20, 30)));
+        ranges2.add(new Splitter.WeightedRange(1.0, t(40, 50)));
+        IPartitioner partitioner = Murmur3Partitioner.instance;
+        Splitter splitter = partitioner.splitter().get();
+
+        assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges2, false));
+    }
+
+    @Test
+    public void testWithWeight2()
+    {
+        List<Splitter.WeightedRange> ranges = new ArrayList<>();
+        ranges.add(new Splitter.WeightedRange(0.2, t(0, 10)));
+        ranges.add(new Splitter.WeightedRange(1.0, t(20, 30)));
+        ranges.add(new Splitter.WeightedRange(1.0, t(40, 50)));
+
+        List<Splitter.WeightedRange> ranges2 = new ArrayList<>();
+        ranges2.add(new Splitter.WeightedRange(1.0, t(0, 2)));
+        ranges2.add(new Splitter.WeightedRange(1.0, t(20, 30)));
+        ranges2.add(new Splitter.WeightedRange(1.0, t(40, 50)));
+        IPartitioner partitioner = Murmur3Partitioner.instance;
+        Splitter splitter = partitioner.splitter().get();
+
+        assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges2, false));
+    }
+
+    private Range<Token> t(long left, long right)
+    {
+        return new Range<>(new Murmur3Partitioner.LongToken(left), new Murmur3Partitioner.LongToken(right));
+    }
+
     private static void randomSplitTestNoVNodes(IPartitioner partitioner)
     {
         Splitter splitter = getSplitter(partitioner);
         Random r = new Random();
         for (int i = 0; i < 10000; i++)
         {
-            List<Range<Token>> localRanges = generateLocalRanges(1, r.nextInt(4) + 1, splitter, r, partitioner instanceof RandomPartitioner);
+            List<Splitter.WeightedRange> localRanges = generateLocalRanges(1, r.nextInt(4) + 1, splitter, r, partitioner instanceof RandomPartitioner);
             List<Token> boundaries = splitter.splitOwnedRanges(r.nextInt(9) + 1, localRanges, false);
             assertTrue("boundaries = " + boundaries + " ranges = " + localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true));
         }
@@ -84,14 +125,14 @@ public class SplitterTest
             int numTokens = 172 + r.nextInt(128);
             int rf = r.nextInt(4) + 2;
             int parts = r.nextInt(5) + 1;
-            List<Range<Token>> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner);
+            List<Splitter.WeightedRange> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner);
             List<Token> boundaries = splitter.splitOwnedRanges(parts, localRanges, true);
             if (!assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, false))
                 fail(String.format("Could not split %d tokens with rf=%d into %d parts (localRanges=%s, boundaries=%s)", numTokens, rf, parts, localRanges, boundaries));
         }
     }
 
-    private static boolean assertRangeSizeEqual(List<Range<Token>> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
+    private static boolean assertRangeSizeEqual(List<Splitter.WeightedRange> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
     {
         Token start = partitioner.getMinimumToken();
         List<BigInteger> splits = new ArrayList<>();
@@ -119,27 +160,27 @@ public class SplitterTest
         return allBalanced;
     }
 
-    private static BigInteger sumOwnedBetween(List<Range<Token>> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
+    private static BigInteger sumOwnedBetween(List<Splitter.WeightedRange> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
     {
         BigInteger sum = BigInteger.ZERO;
-        for (Range<Token> range : localRanges)
+        for (Splitter.WeightedRange range : localRanges)
         {
             if (splitIndividualRanges)
             {
-                Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range);
+                Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range.range());
                 for (Range<Token> intersection : intersections)
                     sum = sum.add(splitter.valueForToken(intersection.right).subtract(splitter.valueForToken(intersection.left)));
             }
             else
             {
-                if (new Range<>(start, end).contains(range.left))
-                    sum = sum.add(splitter.valueForToken(range.right).subtract(splitter.valueForToken(range.left)));
+                if (new Range<>(start, end).contains(range.left()))
+                    sum = sum.add(splitter.valueForToken(range.right()).subtract(splitter.valueForToken(range.left())));
             }
         }
         return sum;
     }
 
-    private static List<Range<Token>> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
+    private static List<Splitter.WeightedRange> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
     {
         int localTokens = numTokens * rf;
         List<Token> randomTokens = new ArrayList<>();
@@ -152,11 +193,11 @@ public class SplitterTest
 
         Collections.sort(randomTokens);
 
-        List<Range<Token>> localRanges = new ArrayList<>(localTokens);
+        List<Splitter.WeightedRange> localRanges = new ArrayList<>(localTokens);
         for (int i = 0; i < randomTokens.size() - 1; i++)
         {
             assert randomTokens.get(i).compareTo(randomTokens.get(i + 1)) < 0;
-            localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i + 1)));
+            localRanges.add(new Splitter.WeightedRange(1.0, new Range<>(randomTokens.get(i), randomTokens.get(i + 1))));
             i++;
         }
         return localRanges;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index bf71c09..34096a7 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.dht;
 
 import java.util.Collections;
 
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -53,7 +54,7 @@ public class StreamStateStoreTest
 
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
         StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
-        session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"));
+        session.addStreamRequest("keyspace1", RangesAtEndpoint.toDummyList(Collections.singleton(range)), RangesAtEndpoint.toDummyList(Collections.emptyList()), Collections.singleton("cf"));
 
         StreamStateStore store = new StreamStateStore();
         // session complete event that is not completed makes data not available for keyspace/ranges
@@ -74,7 +75,7 @@ public class StreamStateStoreTest
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
         session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
-        session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"));
+        session.addStreamRequest("keyspace1", RangesAtEndpoint.toDummyList(Collections.singleton(range2)), RangesAtEndpoint.toDummyList(Collections.emptyList()), Collections.singleton("cf"));
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
index 833ee8b..0710945 100644
--- a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -62,7 +62,7 @@ public class PendingRangeCalculatorServiceTest
     @BMRule(name = "Block pending range calculation",
             targetClass = "TokenMetadata",
             targetMethod = "calculatePendingRanges",
-            targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+            targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressReplicas",
             action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
     public void testDelayedResponse() throws UnknownHostException, InterruptedException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index fccb344..9e3594b 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest
 
         private TestableBTW(Descriptor desc)
         {
-            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null,
+            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, false,
                                                new SerializationHeader(true, cfs.metadata(),
                                                                        cfs.metadata().regularAndStaticColumns(),
                                                                        EncodingStats.NO_STATS)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index dbb929d..faf46bc 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -649,7 +649,7 @@ public class CQLSSTableWriterTest
             public void init(String keyspace)
             {
                 this.keyspace = keyspace;
-                for (Range<Token> range : StorageService.instance.getLocalRanges(ks))
+                for (Range<Token> range : StorageService.instance.getLocalReplicas(ks).ranges())
                     addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 044cd9f..c4ccf48 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -176,21 +176,26 @@ public class LegacySSTableTest
             {
                 for (SSTableReader sstable : cfs.getLiveSSTables())
                 {
-                    sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, NO_PENDING_REPAIR);
+                    sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1234, NO_PENDING_REPAIR, false);
                     sstable.reloadSSTableMetadata();
                     assertEquals(1234, sstable.getRepairedAt());
                     if (sstable.descriptor.version.hasPendingRepair())
                         assertEquals(NO_PENDING_REPAIR, sstable.getPendingRepair());
                 }
 
+                boolean isTransient = false;
                 for (SSTableReader sstable : cfs.getLiveSSTables())
                 {
                     UUID random = UUID.randomUUID();
-                    sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, UNREPAIRED_SSTABLE, random);
+                    sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, UNREPAIRED_SSTABLE, random, isTransient);
                     sstable.reloadSSTableMetadata();
                     assertEquals(UNREPAIRED_SSTABLE, sstable.getRepairedAt());
                     if (sstable.descriptor.version.hasPendingRepair())
                         assertEquals(random, sstable.getPendingRepair());
+                    if (sstable.descriptor.version.hasIsTransient())
+                        assertEquals(isTransient, sstable.isTransient());
+
+                    isTransient = !isTransient;
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 8509115..5d40f8c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -31,14 +31,13 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -109,8 +108,8 @@ public class SSTableLoaderTest
         public void init(String keyspace)
         {
             this.keyspace = keyspace;
-            for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1))
-                addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
+            for (Replica replica : StorageService.instance.getLocalReplicas(KEYSPACE1))
+                addRangeForEndpoint(replica.range(), FBUtilities.getBroadcastAddressAndPort());
         }
 
         public TableMetadataRef getTableMetadata(String tableName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 6412ef4..7c47c8b 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -877,7 +877,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
             File dir = cfs.getDirectories().getDirectoryForNewSSTables();
             Descriptor desc = cfs.newSSTableDescriptor(dir);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, null, false, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
                 for ( ; i < end ; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 441a3b9..731cee2 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -219,7 +219,7 @@ public class SSTableUtils
             TableMetadata metadata = Schema.instance.getTableMetadata(ksname, cfname);
             ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
             SerializationHeader header = appender.header();
-            SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, 0, header);
+            SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false, 0, header);
             while (appender.append(writer)) { /* pass */ }
             Collection<SSTableReader> readers = writer.finish(true);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
index 5d62cdb..31d0b89 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.UUID;
 
 import org.junit.Test;
 
@@ -32,9 +33,12 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static junit.framework.Assert.fail;
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -245,4 +249,60 @@ public class SSTableWriterTest extends SSTableWriterTestBase
         }
     }
 
+    private static void assertValidRepairMetadata(long repairedAt, UUID pendingRepair, boolean isTransient)
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+        try (SSTableWriter writer = getWriter(cfs, dir, txn, repairedAt, pendingRepair, isTransient))
+        {
+            // expected
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new AssertionError("Unexpected IllegalArgumentException", e);
+        }
+
+        txn.abort();
+        LifecycleTransaction.waitForDeletions();
+    }
+
+    private static void assertInvalidRepairMetadata(long repairedAt, UUID pendingRepair, boolean isTransient)
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+        try (SSTableWriter writer = getWriter(cfs, dir, txn, repairedAt, pendingRepair, isTransient))
+        {
+            fail("Expected IllegalArgumentException");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // expected
+        }
+
+        txn.abort();
+        LifecycleTransaction.waitForDeletions();
+    }
+
+    /**
+     * It should only be possible to create sstables marked transient that also have a pending repair
+     */
+    @Test
+    public void testRepairMetadataValidation()
+    {
+        assertValidRepairMetadata(UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false);
+        assertValidRepairMetadata(1, NO_PENDING_REPAIR, false);
+        assertValidRepairMetadata(UNREPAIRED_SSTABLE, UUID.randomUUID(), false);
+        assertValidRepairMetadata(UNREPAIRED_SSTABLE, UUID.randomUUID(), true);
+
+        assertInvalidRepairMetadata(UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, true);
+        assertInvalidRepairMetadata(1, UUID.randomUUID(), false);
+        assertInvalidRepairMetadata(1, NO_PENDING_REPAIR, true);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
index d42c49b..962e1a1 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
@@ -48,6 +49,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class SSTableWriterTestBase extends SchemaLoader
 {
@@ -161,10 +163,15 @@ public class SSTableWriterTestBase extends SchemaLoader
             assertFalse(CompactionManager.instance.submitMaximal(cfs, cfs.gcBefore((int) (System.currentTimeMillis() / 1000)), false).isEmpty());
     }
 
-    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn, long repairedAt, UUID pendingRepair, boolean isTransient)
     {
         Descriptor desc = cfs.newSSTableDescriptor(directory);
-        return SSTableWriter.create(desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
+        return SSTableWriter.create(desc, 0, repairedAt, pendingRepair, isTransient, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
+    }
+
+    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+    {
+        return getWriter(cfs, directory, txn, 0, null, false);
     }
 
     public static ByteBuffer random(int i, int size)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
index 9aa4e28..aea3b4a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
@@ -96,7 +96,7 @@ public class SSTableFlushObserverTest
                                                                   KS_NAME, CF_NAME,
                                                                   0,
                                                                   sstableFormat),
-                                                   10L, 0L, null, TableMetadataRef.forOfflineTools(cfm),
+                                                   10L, 0L, null, false, TableMetadataRef.forOfflineTools(cfm),
                                                    new MetadataCollector(cfm.comparator).sstableLevel(0),
                                                    new SerializationHeader(true, cfm, cfm.regularAndStaticColumns(), EncodingStats.NO_STATS),
                                                    Collections.singletonList(observer),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 8ab1511..f109d8f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -99,7 +99,7 @@ public class MetadataSerializerTest
 
         String partitioner = RandomPartitioner.class.getCanonicalName();
         double bfFpChance = 0.1;
-        return collector.finalizeMetadata(partitioner, bfFpChance, 0, null, SerializationHeader.make(cfm, Collections.emptyList()));
+        return collector.finalizeMetadata(partitioner, bfFpChance, 0, null, false, SerializationHeader.make(cfm, Collections.emptyList()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
index 202d7f1..bf1d940 100644
--- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
@@ -50,6 +50,16 @@ public class DynamicEndpointSnitchTest
         Thread.sleep(150);
     }
 
+    private static EndpointsForRange full(InetAddressAndPort... endpoints)
+    {
+        EndpointsForRange.Builder rlist = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, endpoints.length);
+        for (InetAddressAndPort endpoint: endpoints)
+        {
+            rlist.add(ReplicaUtils.full(endpoint));
+        }
+        return rlist.build();
+    }
+
     @Test
     public void testSnitch() throws InterruptedException, IOException, ConfigurationException
     {
@@ -66,41 +76,41 @@ public class DynamicEndpointSnitchTest
 
         // first, make all hosts equal
         setScores(dsnitch, 1, hosts, 10, 10, 10);
-        List<InetAddressAndPort> order = Arrays.asList(host1, host2, host3);
-        assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+        EndpointsForRange order = full(host1, host2, host3);
+        assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
 
         // make host1 a little worse
         setScores(dsnitch, 1, hosts, 20, 10, 10);
-        order = Arrays.asList(host2, host3, host1);
-        assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+        order = full(host2, host3, host1);
+        assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
 
         // make host2 as bad as host1
         setScores(dsnitch, 2, hosts, 15, 20, 10);
-        order = Arrays.asList(host3, host1, host2);
-        assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+        order = full(host3, host1, host2);
+        assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
 
         // make host3 the worst
         setScores(dsnitch, 3, hosts, 10, 10, 30);
-        order = Arrays.asList(host1, host2, host3);
-        assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+        order = full(host1, host2, host3);
+        assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
 
         // make host3 equal to the others
         setScores(dsnitch, 5, hosts, 10, 10, 10);
-        order = Arrays.asList(host1, host2, host3);
-        assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+        order = full(host1, host2, host3);
+        assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
 
         /// Tests CASSANDRA-6683 improvements
         // make the scores differ enough from the ideal order that we sort by score; under the old
         // dynamic snitch behavior (where we only compared neighbors), these wouldn't get sorted
         setScores(dsnitch, 20, hosts, 10, 70, 20);
-        order = Arrays.asList(host1, host3, host2);
-        assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+        order = full(host1, host3, host2);
+        assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
 
-        order = Arrays.asList(host4, host1, host3, host2);
-        assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3, host4)));
+        order = full(host4, host1, host3, host2);
+        assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3, host4)));
 
         setScores(dsnitch, 20, hosts, 10, 10, 10);
-        order = Arrays.asList(host4, host1, host2, host3);
-        assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3, host4)));
+        order = full(host4, host1, host2, host3);
+        assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3, host4)));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index ab6c6cd..5f6e26f 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 
 import org.junit.Assert;
@@ -36,12 +37,17 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
 import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
 import org.apache.cassandra.service.StorageService;
 
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+
 public class NetworkTopologyStrategyTest
 {
     private String keyspaceName = "Keyspace1";
@@ -51,6 +57,7 @@ public class NetworkTopologyStrategyTest
     public static void setupDD()
     {
         DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
     }
 
     @Test
@@ -68,13 +75,14 @@ public class NetworkTopologyStrategyTest
 
         // Set the localhost to the tokenmetadata. Embedded cassandra way?
         NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
-        assert strategy.getReplicationFactor("DC1") == 3;
-        assert strategy.getReplicationFactor("DC2") == 2;
-        assert strategy.getReplicationFactor("DC3") == 1;
+        assert strategy.getReplicationFactor("DC1").allReplicas == 3;
+        assert strategy.getReplicationFactor("DC2").allReplicas == 2;
+        assert strategy.getReplicationFactor("DC3").allReplicas == 1;
         // Query for the natural hosts
-        ArrayList<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(new StringToken("123"));
-        assert 6 == endpoints.size();
-        assert 6 == new HashSet<>(endpoints).size(); // ensure uniqueness
+        EndpointsForToken replicas = strategy.getNaturalReplicasForToken(new StringToken("123"));
+        assert 6 == replicas.size();
+        assert 6 == replicas.endpoints().size(); // ensure uniqueness
+        assert 6 == new HashSet<>(replicas.byEndpoint().values()).size(); // ensure uniqueness
     }
 
     @Test
@@ -92,13 +100,14 @@ public class NetworkTopologyStrategyTest
 
         // Set the localhost to the tokenmetadata. Embedded cassandra way?
         NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
-        assert strategy.getReplicationFactor("DC1") == 3;
-        assert strategy.getReplicationFactor("DC2") == 3;
-        assert strategy.getReplicationFactor("DC3") == 0;
+        assert strategy.getReplicationFactor("DC1").allReplicas == 3;
+        assert strategy.getReplicationFactor("DC2").allReplicas == 3;
+        assert strategy.getReplicationFactor("DC3").allReplicas == 0;
         // Query for the natural hosts
-        ArrayList<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(new StringToken("123"));
-        assert 6 == endpoints.size();
-        assert 6 == new HashSet<>(endpoints).size(); // ensure uniqueness
+        EndpointsForToken replicas = strategy.getNaturalReplicasForToken(new StringToken("123"));
+        assert 6 == replicas.size();
+        assert 6 == replicas.endpoints().size(); // ensure uniqueness
+        assert 6 == new HashSet<>(replicas.byEndpoint().values()).size(); // ensure uniqueness
     }
 
     @Test
@@ -137,12 +146,13 @@ public class NetworkTopologyStrategyTest
 
         for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"})
         {
-            List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(new StringToken(testToken), metadata);
-            Set<InetAddressAndPort> epSet = new HashSet<>(endpoints);
+            EndpointsForRange replicas = strategy.calculateNaturalReplicas(new StringToken(testToken), metadata);
+            Set<InetAddressAndPort> endpointSet = replicas.endpoints();
 
-            Assert.assertEquals(totalRF, endpoints.size());
-            Assert.assertEquals(totalRF, epSet.size());
-            logger.debug("{}: {}", testToken, endpoints);
+            Assert.assertEquals(totalRF, replicas.size());
+            Assert.assertEquals(totalRF, new HashSet<>(replicas.byEndpoint().values()).size());
+            Assert.assertEquals(totalRF, endpointSet.size());
+            logger.debug("{}: {}", testToken, replicas);
         }
     }
 
@@ -209,7 +219,7 @@ public class NetworkTopologyStrategyTest
         {
             Token token = Murmur3Partitioner.instance.getRandomToken(rand);
             List<InetAddressAndPort> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch);
-            List<InetAddressAndPort> actual = nts.calculateNaturalEndpoints(token, tokenMetadata);
+            List<InetAddressAndPort> actual = new ArrayList<>(nts.calculateNaturalReplicas(token, tokenMetadata).endpoints());
             if (endpointsDiffer(expected, actual))
             {
                 System.err.println("Endpoints mismatch for token " + token);
@@ -373,4 +383,50 @@ public class NetworkTopologyStrategyTest
         Integer replicas = datacenters.get(dc);
         return replicas == null ? 0 : replicas;
     }
+
+    private static Token tk(long t)
+    {
+        return new LongToken(t);
+    }
+
+    private static Range<Token> range(long l, long r)
+    {
+        return new Range<>(tk(l), tk(r));
+    }
+
+    @Test
+    public void testTransientReplica() throws Exception
+    {
+        IEndpointSnitch snitch = new SimpleSnitch();
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+
+        List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"),
+                                                                InetAddressAndPort.getByName("127.0.0.2"),
+                                                                InetAddressAndPort.getByName("127.0.0.3"),
+                                                                InetAddressAndPort.getByName("127.0.0.4"));
+
+        Multimap<InetAddressAndPort, Token> tokens = HashMultimap.create();
+        tokens.put(endpoints.get(0), tk(100));
+        tokens.put(endpoints.get(1), tk(200));
+        tokens.put(endpoints.get(2), tk(300));
+        tokens.put(endpoints.get(3), tk(400));
+        TokenMetadata metadata = new TokenMetadata();
+        metadata.updateNormalTokens(tokens);
+
+        Map<String, String> configOptions = new HashMap<String, String>();
+        configOptions.put(snitch.getDatacenter((InetAddressAndPort) null), "3/1");
+
+        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+
+        Assert.assertEquals(EndpointsForRange.of(fullReplica(endpoints.get(0), range(400, 100)),
+                                           fullReplica(endpoints.get(1), range(400, 100)),
+                                           transientReplica(endpoints.get(2), range(400, 100))),
+                            strategy.getNaturalReplicasForToken(tk(99)));
+
+
+        Assert.assertEquals(EndpointsForRange.of(fullReplica(endpoints.get(1), range(100, 200)),
+                                           fullReplica(endpoints.get(2), range(100, 200)),
+                                           transientReplica(endpoints.get(3), range(100, 200))),
+                            strategy.getNaturalReplicasForToken(tk(101)));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org