You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:59 UTC

[07/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index a43e3eb..4af4a92 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
@@ -27,24 +26,28 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.service.reads.DigestResolver;
 
 /**
  * Bypasses the read repair path for short read protection and testing
  */
-public class NoopReadRepair implements ReadRepair
+public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
 {
     public static final NoopReadRepair instance = new NoopReadRepair();
 
     private NoopReadRepair() {}
 
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+    @Override
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicas)
     {
         return UnfilteredPartitionIterators.MergeListener.NOOP;
     }
 
-    public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+    @Override
+    public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
     {
         resultConsumer.accept(digestResolver.getData());
     }
@@ -72,7 +75,7 @@ public class NoopReadRepair implements ReadRepair
     }
 
     @Override
-    public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
     {
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 6cf761a..4cae3ae 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -28,18 +28,18 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
 
 public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener
 {
-    private final InetAddressAndPort[] sources;
+    private final ReplicaLayout replicaLayout;
     private final ReadCommand command;
     private final ConsistencyLevel consistency;
     private final ReadRepair readRepair;
 
-    public PartitionIteratorMergeListener(InetAddressAndPort[] sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+    public PartitionIteratorMergeListener(ReplicaLayout replicaLayout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
     {
-        this.sources = sources;
+        this.replicaLayout = replicaLayout;
         this.command = command;
         this.consistency = consistency;
         this.readRepair = readRepair;
@@ -47,10 +47,10 @@ public class PartitionIteratorMergeListener implements UnfilteredPartitionIterat
 
     public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
     {
-        return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), sources, command, consistency, readRepair);
+        return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), replicaLayout, command, consistency, readRepair);
     }
 
-    private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
+    protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
     {
         Columns statics = Columns.NONE;
         Columns regulars = Columns.NONE;
@@ -66,7 +66,7 @@ public class PartitionIteratorMergeListener implements UnfilteredPartitionIterat
         return new RegularAndStaticColumns(statics, regulars);
     }
 
-    private boolean isReversed(List<UnfilteredRowIterator> versions)
+    protected boolean isReversed(List<UnfilteredRowIterator> versions)
     {
         for (UnfilteredRowIterator iter : versions)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index d994b23..c13e2d6 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -21,27 +21,28 @@ package org.apache.cassandra.service.reads.repair;
 import java.util.Map;
 
 import com.codahale.metrics.Meter;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 
 /**
  * Only performs the collection of data responses and reconciliation of them, doesn't send repair mutations
  * to replicas. This preserves write atomicity, but doesn't provide monotonic quorum reads
  */
-public class ReadOnlyReadRepair extends AbstractReadRepair
+public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
 {
-    public ReadOnlyReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
+    ReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
     {
-        super(command, queryStartNanoTime, consistency);
+        super(command, replicaLayout, queryStartNanoTime);
     }
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
     {
         return UnfilteredPartitionIterators.MergeListener.NOOP;
     }
@@ -59,7 +60,7 @@ public class ReadOnlyReadRepair extends AbstractReadRepair
     }
 
     @Override
-    public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
     {
         throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 97f0f67..168f003 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -17,44 +17,45 @@
  */
 package org.apache.cassandra.service.reads.repair;
 
-import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.locator.Endpoints;
+
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.service.reads.DigestResolver;
 
-public interface ReadRepair
+public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
 {
     public interface Factory
     {
-        ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency);
+        <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime);
+    }
+
+    static <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaPlan, long queryStartNanoTime)
+    {
+        return command.metadata().params.readRepair.create(command, replicaPlan, queryStartNanoTime);
     }
 
     /**
      * Used by DataResolver to generate corrections as the partition iterator is consumed
      */
-    UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints);
+    UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout);
 
     /**
      * Called when the digests from the initial read don't match. Reads may block on the
      * repair started by this method.
      * @param digestResolver supplied so we can get the original data response
-     * @param allEndpoints all available replicas for this read
-     * @param contactedEndpoints the replicas we actually sent requests to
      * @param resultConsumer hook for the repair to set it's result on completion
      */
-    public void startRepair(DigestResolver digestResolver,
-                            List<InetAddressAndPort> allEndpoints,
-                            List<InetAddressAndPort> contactedEndpoints,
-                            Consumer<PartitionIterator> resultConsumer);
+    public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer);
 
     /**
      * Block on the reads (or timeout) sent out in {@link ReadRepair#startRepair}
@@ -81,17 +82,13 @@ public interface ReadRepair
     public void maybeSendAdditionalWrites();
 
     /**
-     * Hook for the merge listener to start repairs on individual partitions.
-     */
-    void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations);
-
-    /**
      * Block on any mutations (or timeout) we sent out to repair replicas in {@link ReadRepair#repairPartition}
      */
     public void awaitWrites();
 
-    static ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
-    {
-        return command.metadata().params.readRepair.create(command, queryStartNanoTime, consistency);
-    }
+    /**
+     * Repairs a partition _after_ receiving data responses. This method receives replica list, since
+     * we will block repair only on the replicas that have responded.
+     */
+    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
index 1117822..6eff395 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service.reads.repair;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -38,8 +39,8 @@ final class ReadRepairDiagnostics
     {
     }
 
-    static void startRepair(AbstractReadRepair readRepair, List<InetAddressAndPort> endpointDestinations,
-                            DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints)
+    static void startRepair(AbstractReadRepair readRepair, Collection<InetAddressAndPort> endpointDestinations,
+                            DigestResolver digestResolver, Collection<InetAddressAndPort> allEndpoints)
     {
         if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR))
             service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
index 152f7e6..9e14362 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.reads.repair;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -48,9 +49,9 @@ final class ReadRepairEvent extends DiagnosticEvent
     private final ConsistencyLevel consistency;
     private final SpeculativeRetryPolicy.Kind speculativeRetry;
     @VisibleForTesting
-    final List<InetAddressAndPort> destinations;
+    final Collection<InetAddressAndPort> destinations;
     @VisibleForTesting
-    final List<InetAddressAndPort> allEndpoints;
+    final Collection<InetAddressAndPort> allEndpoints;
     @Nullable
     private final DigestResolverDebugResult[] digestsByEndpoint;
 
@@ -60,13 +61,13 @@ final class ReadRepairEvent extends DiagnosticEvent
         SPECULATED_READ
     }
 
-    ReadRepairEvent(ReadRepairEventType type, AbstractReadRepair readRepair, List<InetAddressAndPort> destinations,
-                    List<InetAddressAndPort> allEndpoints, DigestResolver digestResolver)
+    ReadRepairEvent(ReadRepairEventType type, AbstractReadRepair readRepair, Collection<InetAddressAndPort> destinations,
+                    Collection<InetAddressAndPort> allEndpoints, DigestResolver digestResolver)
     {
         this.keyspace = readRepair.cfs.keyspace;
         this.tableName = readRepair.cfs.getTableName();
         this.cqlCommand = readRepair.command.toCQLString();
-        this.consistency = readRepair.consistency;
+        this.consistency = readRepair.replicaLayout.consistencyLevel();
         this.speculativeRetry = readRepair.cfs.metadata().params.speculativeRetry.kind();
         this.destinations = destinations;
         this.allEndpoints = allEndpoints;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
index 5945633..28c0e9e 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
@@ -18,26 +18,25 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaLayout;
 
 public enum ReadRepairStrategy implements ReadRepair.Factory
 {
     NONE
     {
-        @Override
-        public ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
+        public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
         {
-            return new ReadOnlyReadRepair(command, queryStartNanoTime, consistency);
+            return new ReadOnlyReadRepair<>(command, replicaLayout, queryStartNanoTime);
         }
     },
 
     BLOCKING
     {
-        @Override
-        public ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
+        public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
         {
-            return new BlockingReadRepair(command, queryStartNanoTime, consistency);
+            return new BlockingReadRepair<>(command, replicaLayout, queryStartNanoTime);
         }
     };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index cb6707d..b0c019a 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.reads.repair;
 import java.util.Arrays;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 import org.apache.cassandra.db.Clustering;
@@ -43,7 +44,9 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowDiffListener;
 import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.schema.ColumnMetadata;
 
 public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener
@@ -51,14 +54,14 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
     private final DecoratedKey partitionKey;
     private final RegularAndStaticColumns columns;
     private final boolean isReversed;
-    private final InetAddressAndPort[] sources;
     private final ReadCommand command;
     private final ConsistencyLevel consistency;
 
     private final PartitionUpdate.Builder[] repairs;
-
+    private final Replica[] sources;
     private final Row.Builder[] currentRows;
     private final RowDiffListener diffListener;
+    private final ReplicaLayout layout;
 
     // The partition level deletion for the merge row.
     private DeletionTime partitionLevelDeletion;
@@ -71,16 +74,21 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
 
     private final ReadRepair readRepair;
 
-    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaLayout layout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
     {
         this.partitionKey = partitionKey;
         this.columns = columns;
         this.isReversed = isReversed;
-        this.sources = sources;
-        repairs = new PartitionUpdate.Builder[sources.length];
-        currentRows = new Row.Builder[sources.length];
-        sourceDeletionTime = new DeletionTime[sources.length];
-        markerToRepair = new ClusteringBound[sources.length];
+        Endpoints<?> sources = layout.selected();
+        this.sources = new Replica[sources.size()];
+        for (int i = 0; i < sources.size(); i++)
+            this.sources[i] = sources.get(i);
+
+        this.layout = layout;
+        repairs = new PartitionUpdate.Builder[sources.size()];
+        currentRows = new Row.Builder[sources.size()];
+        sourceDeletionTime = new DeletionTime[sources.size()];
+        markerToRepair = new ClusteringBound[sources.size()];
         this.command = command;
         this.consistency = consistency;
         this.readRepair = readRepair;
@@ -89,25 +97,25 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
         {
             public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
             {
-                if (merged != null && !merged.equals(original))
+                if (merged != null && !merged.equals(original) && !isTransient(i))
                     currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
             }
 
             public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
             {
-                if (merged != null && !merged.equals(original))
+                if (merged != null && !merged.equals(original) && !isTransient(i))
                     currentRow(i, clustering).addRowDeletion(merged);
             }
 
             public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
             {
-                if (merged != null && !merged.equals(original))
+                if (merged != null && !merged.equals(original) && !isTransient(i))
                     currentRow(i, clustering).addComplexDeletion(column, merged);
             }
 
             public void onCell(int i, Clustering clustering, Cell merged, Cell original)
             {
-                if (merged != null && !merged.equals(original) && isQueried(merged))
+                if (merged != null && !merged.equals(original) && isQueried(merged) && !isTransient(i))
                     currentRow(i, clustering).addCell(merged);
             }
 
@@ -126,6 +134,11 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
         };
     }
 
+    private boolean isTransient(int i)
+    {
+        return sources[i].isTransient();
+    }
+
     private PartitionUpdate.Builder update(int i)
     {
         if (repairs[i] == null)
@@ -159,6 +172,9 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
         this.partitionLevelDeletion = mergedDeletion;
         for (int i = 0; i < versions.length; i++)
         {
+            if (isTransient(i))
+                continue;
+
             if (mergedDeletion.supersedes(versions[i]))
                 update(i).addPartitionDeletion(mergedDeletion);
         }
@@ -193,6 +209,9 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
 
         for (int i = 0; i < versions.length; i++)
         {
+            if (isTransient(i))
+                continue;
+
             RangeTombstoneMarker marker = versions[i];
 
             // Update what the source now thinks is the current deletion
@@ -245,12 +264,12 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
                     if (!marker.isBoundary() && marker.isOpen(isReversed)) // (1)
                     {
                         assert currentDeletion.equals(marker.openDeletionTime(isReversed))
-                            : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+                        : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
                     }
                     else // (2)
                     {
                         assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
-                            : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+                        : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
                     }
 
                     // and so unless it's a boundary whose opening deletion time is still equal to the current
@@ -306,13 +325,14 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
 
     public void close()
     {
-        Map<InetAddressAndPort, Mutation> mutations = null;
+        Map<Replica, Mutation> mutations = null;
         for (int i = 0; i < repairs.length; i++)
         {
             if (repairs[i] == null)
                 continue;
 
-            Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i], false);
+            Preconditions.checkState(!isTransient(i), "cannot read repair transient replicas");
+            Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i].endpoint(), false);
             if (mutation == null)
                 continue;
 
@@ -324,7 +344,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
 
         if (mutations != null)
         {
-            readRepair.repairPartition(partitionKey, mutations, sources);
+            readRepair.repairPartition(partitionKey, mutations, layout);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
index 609d2a0..38c25dc 100644
--- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
+++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,8 +44,10 @@ public class DefaultConnectionFactory implements StreamConnectionFactory
 
     private static final int DEFAULT_CHANNEL_BUFFER_SIZE = 1 << 22;
 
-    private static final long MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30);
-    private static final int MAX_CONNECT_ATTEMPTS = 3;
+    @VisibleForTesting
+    public static long MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30);
+    @VisibleForTesting
+    public static int MAX_CONNECT_ATTEMPTS = 3;
 
     @Override
     public Channel createConnection(OutboundConnectionIdentifier connectionId, int protocolVersion) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index b56f165..2f6deb5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -19,11 +19,14 @@ package org.apache.cassandra.streaming;
 
 import java.util.*;
 
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.utils.UUIDGen;
 
+import static com.google.common.collect.Iterables.all;
 import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
 
 /**
@@ -69,12 +72,13 @@ public class StreamPlan
      *
      * @param from endpoint address to fetch data from.
      * @param keyspace name of keyspace
-     * @param ranges ranges to fetch
+     * @param fullRanges ranges to fetch that from provides the full version of
+     * @param transientRanges ranges to fetch that from provides only transient data of
      * @return this object for chaining
      */
-    public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Collection<Range<Token>> ranges)
+    public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges)
     {
-        return requestRanges(from, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
+        return requestRanges(from, keyspace, fullRanges, transientRanges, EMPTY_COLUMN_FAMILIES);
     }
 
     /**
@@ -82,14 +86,20 @@ public class StreamPlan
      *
      * @param from endpoint address to fetch data from.
      * @param keyspace name of keyspace
-     * @param ranges ranges to fetch
+     * @param fullRanges ranges to fetch that from provides the full data for
+     * @param transientRanges ranges to fetch that from provides only transient data for
      * @param columnFamilies specific column families
      * @return this object for chaining
      */
-    public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+    public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies)
     {
+        //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
+        assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
+             fullRanges.toString();
+        assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
+        transientRanges.toString();
         StreamSession session = coordinator.getOrCreateNextSession(from);
-        session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies));
+        session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies));
         return this;
     }
 
@@ -98,14 +108,14 @@ public class StreamPlan
      *
      * @param to endpoint address of receiver
      * @param keyspace name of keyspace
-     * @param ranges ranges to send
+     * @param replicas ranges to send
      * @param columnFamilies specific column families
      * @return this object for chaining
      */
-    public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+    public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, RangesAtEndpoint replicas, String... columnFamilies)
     {
         StreamSession session = coordinator.getOrCreateNextSession(to);
-        session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer);
+        session.addTransferRanges(keyspace, replicas, Arrays.asList(columnFamilies), flushBeforeTransfer);
         return this;
     }
 
@@ -182,4 +192,10 @@ public class StreamPlan
     {
         return flushBeforeTransfer;
     }
+
+    @VisibleForTesting
+    public StreamCoordinator getCoordinator()
+    {
+        return coordinator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 4a3761e..f37268f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -29,6 +29,10 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 
 public class StreamRequest
@@ -36,12 +40,23 @@ public class StreamRequest
     public static final IVersionedSerializer<StreamRequest> serializer = new StreamRequestSerializer();
 
     public final String keyspace;
-    public final Collection<Range<Token>> ranges;
+    //Full replicas and transient replicas are split based on the transient status of the remote we are fetching
+    //from. We preserve this distinction so on completion we can log to a system table whether we got the data transiently
+    //or fully from some remote. This is an important distinction for resumable bootstrap. The Replicas in these collections
+    //are local replicas (or dummy if this is triggered by repair) and don't encode the necessary information about
+    //what the remote provided.
+    public final RangesAtEndpoint full;
+    public final RangesAtEndpoint transientReplicas;
     public final Collection<String> columnFamilies = new HashSet<>();
-    public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+
+    public StreamRequest(String keyspace, RangesAtEndpoint full, RangesAtEndpoint transientReplicas, Collection<String> columnFamilies)
     {
         this.keyspace = keyspace;
-        this.ranges = ranges;
+        if (!full.endpoint().equals(transientReplicas.endpoint()))
+            throw new IllegalStateException("Mismatching endpoints: " + full + ", " + transientReplicas);
+
+        this.full = full;
+        this.transientReplicas = transientReplicas;
         this.columnFamilies.addAll(columnFamilies);
     }
 
@@ -50,49 +65,82 @@ public class StreamRequest
         public void serialize(StreamRequest request, DataOutputPlus out, int version) throws IOException
         {
             out.writeUTF(request.keyspace);
-            out.writeInt(request.ranges.size());
-            for (Range<Token> range : request.ranges)
-            {
-                MessagingService.validatePartitioner(range);
-                Token.serializer.serialize(range.left, out, version);
-                Token.serializer.serialize(range.right, out, version);
-            }
             out.writeInt(request.columnFamilies.size());
+
+            CompactEndpointSerializationHelper.streamingInstance.serialize(request.full.endpoint(), out, version);
+            serializeReplicas(request.full, out, version);
+            serializeReplicas(request.transientReplicas, out, version);
             for (String cf : request.columnFamilies)
                 out.writeUTF(cf);
         }
 
-        public StreamRequest deserialize(DataInputPlus in, int version) throws IOException
+        private void serializeReplicas(RangesAtEndpoint replicas, DataOutputPlus out, int version) throws IOException
         {
-            String keyspace = in.readUTF();
-            int rangeCount = in.readInt();
-            List<Range<Token>> ranges = new ArrayList<>(rangeCount);
-            for (int i = 0; i < rangeCount; i++)
+            out.writeInt(replicas.size());
+
+            for (Replica replica : replicas)
             {
-                Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
-                Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
-                ranges.add(new Range<>(left, right));
+                MessagingService.validatePartitioner(replica.range());
+                Token.serializer.serialize(replica.range().left, out, version);
+                Token.serializer.serialize(replica.range().right, out, version);
             }
+        }
+
+        public StreamRequest deserialize(DataInputPlus in, int version) throws IOException
+        {
+            String keyspace = in.readUTF();
             int cfCount = in.readInt();
+            InetAddressAndPort endpoint = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
+
+            RangesAtEndpoint full = deserializeReplicas(in, version, endpoint, true);
+            RangesAtEndpoint transientReplicas = deserializeReplicas(in, version, endpoint, false);
             List<String> columnFamilies = new ArrayList<>(cfCount);
             for (int i = 0; i < cfCount; i++)
                 columnFamilies.add(in.readUTF());
-            return new StreamRequest(keyspace, ranges, columnFamilies);
+            return new StreamRequest(keyspace, full, transientReplicas, columnFamilies);
         }
 
-        public long serializedSize(StreamRequest request, int version)
+        RangesAtEndpoint deserializeReplicas(DataInputPlus in, int version, InetAddressAndPort endpoint, boolean isFull) throws IOException
         {
-            int size = TypeSizes.sizeof(request.keyspace);
-            size += TypeSizes.sizeof(request.ranges.size());
-            for (Range<Token> range : request.ranges)
+            int replicaCount = in.readInt();
+
+            RangesAtEndpoint.Builder replicas = RangesAtEndpoint.builder(endpoint, replicaCount);
+            for (int i = 0; i < replicaCount; i++)
             {
-                size += Token.serializer.serializedSize(range.left, version);
-                size += Token.serializer.serializedSize(range.right, version);
+                //TODO, super need to review the usage of streaming vs not streaming endpoint serialization helper
+                //to make sure I'm not using the wrong one some of the time, like do repair messages use the
+                //streaming version?
+                Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
+                Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
+                replicas.add(new Replica(endpoint, new Range<>(left, right), isFull));
             }
+            return replicas.build();
+        }
+
+        public long serializedSize(StreamRequest request, int version)
+        {
+            int size = TypeSizes.sizeof(request.keyspace);
             size += TypeSizes.sizeof(request.columnFamilies.size());
+            size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(request.full.endpoint(), version);
+            size += replicasSerializedSize(request.transientReplicas, version);
+            size += replicasSerializedSize(request.full, version);
             for (String cf : request.columnFamilies)
                 size += TypeSizes.sizeof(cf);
             return size;
         }
+
+        private long replicasSerializedSize(RangesAtEndpoint replicas, int version)
+        {
+            long size = 0;
+            size += TypeSizes.sizeof(replicas.size());
+
+            for (Replica replica : replicas)
+            {
+                size += Token.serializer.serializedSize(replica.range().left, version);
+                size += Token.serializer.serializedSize(replica.range().right, version);
+            }
+            return size;
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 393cd24..ec80772 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.Futures;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
@@ -49,6 +51,8 @@ import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
+import static com.google.common.collect.Iterables.all;
+
 /**
  * Handles the streaming a one or more streams to and from a specific remote node.
  *
@@ -243,7 +247,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
     public StreamReceiver getAggregator(TableId tableId)
     {
-        assert receivers.containsKey(tableId);
+        assert receivers.containsKey(tableId) : "Missing tableId " + tableId;
         return receivers.get(tableId).getReceiver();
     }
 
@@ -297,38 +301,52 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      * Request data fetch task to this session.
      *
      * @param keyspace Requesting keyspace
-     * @param ranges Ranges to retrieve data
+     * @param fullRanges Ranges to retrieve data that will return full data from the source
+     * @param transientRanges Ranges to retrieve data that will return transient data from the source
      * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
      */
-    public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+    public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies)
     {
-        requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
+        //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
+        assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : fullRanges.toString();
+        assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : transientRanges.toString();
+        requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
     }
 
     /**
      * Set up transfer for specific keyspace/ranges/CFs
      *
      * @param keyspace Transfer keyspace
-     * @param ranges Transfer ranges
+     * @param replicas Transfer ranges
      * @param columnFamilies Transfer ColumnFamilies
      * @param flushTables flush tables?
      */
-    synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
+    synchronized void addTransferRanges(String keyspace, RangesAtEndpoint replicas, Collection<String> columnFamilies, boolean flushTables)
     {
         failIfFinished();
         Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
         if (flushTables)
             flushSSTables(stores);
 
-        List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-        List<OutgoingStream> streams = getOutgoingStreamsForRanges(normalizedRanges, stores, pendingRepair, previewKind);
+        //Was it safe to remove this normalize, sorting seems not to matter, merging? Maybe we should have?
+        //Do we need to unwrap here also or is that just making it worse?
+        //Range and if it's transient
+        RangesAtEndpoint.Builder unwrappedRanges = RangesAtEndpoint.builder(replicas.endpoint(), replicas.size());
+        for (Replica replica : replicas)
+        {
+            for (Range<Token> unwrapped : replica.range().unwrap())
+            {
+                unwrappedRanges.add(new Replica(replica.endpoint(), unwrapped, replica.isFull()));
+            }
+        }
+        List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges.build(), stores, pendingRepair, previewKind);
         addTransferStreams(streams);
         Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace);
         if (toBeUpdated == null)
         {
             toBeUpdated = new HashSet<>();
         }
-        toBeUpdated.addAll(ranges);
+        toBeUpdated.addAll(replicas.ranges());
         transferredRangesPerKeyspace.put(keyspace, toBeUpdated);
     }
 
@@ -355,14 +373,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     }
 
     @VisibleForTesting
-    public List<OutgoingStream> getOutgoingStreamsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind)
+    public List<OutgoingStream> getOutgoingStreamsForRanges(RangesAtEndpoint replicas, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind)
     {
         List<OutgoingStream> streams = new ArrayList<>();
         try
         {
             for (ColumnFamilyStore cfs: stores)
             {
-                streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, ranges, pendingRepair, previewKind));
+                streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, replicas, pendingRepair, previewKind));
             }
         }
         catch (Throwable t)
@@ -561,7 +579,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
 
         for (StreamRequest request : requests)
-            addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request
+            addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true); // always flush on stream request
         for (StreamSummary summary : summaries)
             prepareReceiving(summary);
 
@@ -812,4 +830,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         }
         maybeCompleted();
     }
+
+    @VisibleForTesting
+    public int getNumRequests()
+    {
+        return requests.size();
+    }
+
+    @VisibleForTesting
+    public int getNumTransfers()
+    {
+        return transferredRangesPerKeyspace.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/TableStreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/TableStreamManager.java b/src/java/org/apache/cassandra/streaming/TableStreamManager.java
index 11512e9..d97fabc 100644
--- a/src/java/org/apache/cassandra/streaming/TableStreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/TableStreamManager.java
@@ -21,8 +21,7 @@ package org.apache.cassandra.streaming;
 import java.util.Collection;
 import java.util.UUID;
 
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.streaming.messages.StreamMessageHeader;
 
 /**
@@ -46,12 +45,12 @@ public interface TableStreamManager
 
     /**
      * Returns a collection of {@link OutgoingStream}s that contains the data selected by the
-     * given ranges, pendingRepair, and preview.
+     * given replicas, pendingRepair, and preview.
      *
      * There aren't any requirements on how data is divided between the outgoing streams
      */
     Collection<OutgoingStream> createOutgoingStreams(StreamSession session,
-                                                     Collection<Range<Token>> ranges,
+                                                     RangesAtEndpoint replicas,
                                                      UUID pendingRepair,
                                                      PreviewKind previewKind);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 03b8af0..54187d1 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -816,6 +816,11 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getNaturalEndpoints(keyspace, cf, key);
     }
 
+    public List<String> getReplicas(String keyspace, String cf, String key)
+    {
+        return ssProxy.getReplicas(keyspace, cf, key);
+    }
+
     public List<String> getSSTables(String keyspace, String cf, String key, boolean hexFormat)
     {
         ColumnFamilyStoreMBean cfsProxy = getCfsProxy(keyspace, cf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index c2193d4..1d09b0f 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -190,6 +190,8 @@ public class NodeTool
                 ReloadSslCertificates.class,
                 EnableAuditLog.class,
                 DisableAuditLog.class,
+                GetReplicas.class,
+                DisableAuditLog.class,
                 EnableOldProtocolVersions.class,
                 DisableOldProtocolVersions.class
         );

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
index 8056ff8..31d80fa 100644
--- a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
@@ -88,11 +88,11 @@ public class SSTableRepairedAtSetter
             if (setIsRepaired)
             {
                 FileTime f = Files.getLastModifiedTime(new File(descriptor.filenameFor(Component.DATA)).toPath());
-                descriptor.getMetadataSerializer().mutateRepaired(descriptor, f.toMillis(), null);
+                descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, f.toMillis(), null, false);
             }
             else
             {
-                descriptor.getMetadataSerializer().mutateRepaired(descriptor, 0, null);
+                descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, 0, null, false);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java b/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java
new file mode 100644
index 0000000..4c401fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "getreplicas", description = "Print replicas for a given key")
+public class GetReplicas extends NodeTool.NodeToolCmd
+{
+    @Arguments(usage = "<keyspace> <table> <key>", description = "The keyspace, the table, and the partition key for which we need to find replicas")
+    private List<String> args = new ArrayList<>();
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        checkArgument(args.size() == 3, "getreplicas requires keyspace, table and partition key arguments");
+        String ks = args.get(0);
+        String table = args.get(1);
+        String key = args.get(2);
+
+        System.out.println(probe.getReplicas(ks, table, key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index a53846c..1e0813c 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -161,7 +161,7 @@ public abstract class TraceState implements ProgressEventNotifier
         trace(MessageFormatter.format(format, arg1, arg2).getMessage());
     }
 
-    public void trace(String format, Object[] args)
+    public void trace(String format, Object... args)
     {
         trace(MessageFormatter.arrayFormat(format, args).getMessage());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 84af41c..8e0b19f 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -68,7 +68,7 @@ public class ErrorMessage extends Message.Response
                         ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
                         int required = body.readInt();
                         int alive = body.readInt();
-                        te = new UnavailableException(cl, required, alive);
+                        te = UnavailableException.create(cl, required, alive);
                     }
                     break;
                 case OVERLOADED:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/utils/Pair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Pair.java b/src/java/org/apache/cassandra/utils/Pair.java
index ea8b8fc..cb09529 100644
--- a/src/java/org/apache/cassandra/utils/Pair.java
+++ b/src/java/org/apache/cassandra/utils/Pair.java
@@ -53,6 +53,18 @@ public class Pair<T1, T2>
         return "(" + left + "," + right + ")";
     }
 
+    //For functional interfaces
+    public T1 left()
+    {
+        return left;
+    }
+
+    //For functional interfaces
+    public T2 right()
+    {
+        return right;
+    }
+
     public static <X, Y> Pair<X, Y> create(X x, Y y)
     {
         return new Pair<X, Y>(x, y);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index e80faca..0c097a6 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -18,6 +18,8 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
+import java.util.AbstractCollection;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -27,7 +29,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
  *
  * @param <E>
  */
-public class Accumulator<E> implements Iterable<E>
+public class Accumulator<E>
 {
     private volatile int nextIndex;
     private volatile int presentCount;
@@ -105,7 +107,7 @@ public class Accumulator<E> implements Iterable<E>
         return values.length;
     }
 
-    public Iterator<E> iterator()
+    private Iterator<E> iterator(int count)
     {
         return new Iterator<E>()
         {
@@ -113,7 +115,7 @@ public class Accumulator<E> implements Iterable<E>
 
             public boolean hasNext()
             {
-                return p < presentCount;
+                return p < count;
             }
 
             public E next()
@@ -135,4 +137,23 @@ public class Accumulator<E> implements Iterable<E>
             throw new IndexOutOfBoundsException();
         return (E) values[i];
     }
+
+    public Collection<E> snapshot()
+    {
+        int count = presentCount;
+        return new AbstractCollection<E>()
+        {
+            @Override
+            public Iterator<E> iterator()
+            {
+                return Accumulator.this.iterator(count);
+            }
+
+            @Override
+            public int size()
+            {
+                return count;
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db
index ceaa5a3..8fad34f 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db
index 6968720..ae35335 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32
index f1c192b..8a92f3c 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32
@@ -1 +1 @@
-4004129384
\ No newline at end of file
+2977407251
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db
index af16195..d50fdeb 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db
index 970e385..7341864 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt
index bb800f8..b03b283 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt
@@ -1,8 +1,8 @@
-Digest.crc32
 Filter.db
-CompressionInfo.db
+Digest.crc32
 Index.db
-Summary.db
-Data.db
 TOC.txt
+Summary.db
 Statistics.db
+CompressionInfo.db
+Data.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db
index f5ad4d0..f0a1cfb 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db
index 7217716..b487fe8 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32
index 4f1391a..ca286e0 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32
@@ -1 +1 @@
-4072239034
\ No newline at end of file
+2759187708
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db
index 6dd3da6..c981a22 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db
index 3a0e63f..33fccc9 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt
index bb800f8..b03b283 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt
@@ -1,8 +1,8 @@
-Digest.crc32
 Filter.db
-CompressionInfo.db
+Digest.crc32
 Index.db
-Summary.db
-Data.db
 TOC.txt
+Summary.db
 Statistics.db
+CompressionInfo.db
+Data.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db
index c665dfb..11219d0 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32
index c6c24a7..985d6dc 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32
@@ -1 +1 @@
-3772296151
\ No newline at end of file
+462858821
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db
index 6741430..3c68ac5 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt
index bb800f8..b03b283 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt
@@ -1,8 +1,8 @@
-Digest.crc32
 Filter.db
-CompressionInfo.db
+Digest.crc32
 Index.db
-Summary.db
-Data.db
 TOC.txt
+Summary.db
 Statistics.db
+CompressionInfo.db
+Data.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db
index d9fe576..620cdf2 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32
index de7baed..bc5f671 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32
@@ -1 +1 @@
-4035692752
\ No newline at end of file
+3987542254
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db
index e9556d1..689bec8 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt
index bb800f8..b03b283 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt
@@ -1,8 +1,8 @@
-Digest.crc32
 Filter.db
-CompressionInfo.db
+Digest.crc32
 Index.db
-Summary.db
-Data.db
 TOC.txt
+Summary.db
 Statistics.db
+CompressionInfo.db
+Data.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
index a5025a3..94a3bd3 100644
--- a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
+++ b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
@@ -54,19 +54,21 @@ public class DynamicEndpointSnitchLongTest
             DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()));
             InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
 
-            List<InetAddressAndPort> hosts = new ArrayList<>();
+            EndpointsForRange.Builder replicasBuilder = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE);
             // We want a big list of hosts so  sorting takes time, making it much more likely to reproduce the
             // problem we're looking for.
             for (int i = 0; i < 100; i++)
                 for (int j = 0; j < 256; j++)
-                    hosts.add(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j}));
+                    replicasBuilder.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j})));
 
-            ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts);
+            EndpointsForRange replicas = replicasBuilder.build();
+
+            ScoreUpdater updater = new ScoreUpdater(dsnitch, replicas);
             updater.start();
 
-            List<InetAddressAndPort> result = null;
+            EndpointsForRange result = replicas;
             for (int i = 0; i < ITERATIONS; i++)
-                result = dsnitch.getSortedListByProximity(self, hosts);
+                result = dsnitch.sortedByProximity(self, result);
 
             updater.stopped = true;
             updater.join();
@@ -84,10 +86,10 @@ public class DynamicEndpointSnitchLongTest
         public volatile boolean stopped;
 
         private final DynamicEndpointSnitch dsnitch;
-        private final List<InetAddressAndPort> hosts;
+        private final EndpointsForRange hosts;
         private final Random random = new Random();
 
-        public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddressAndPort> hosts)
+        public ScoreUpdater(DynamicEndpointSnitch dsnitch, EndpointsForRange hosts)
         {
             this.dsnitch = dsnitch;
             this.hosts = hosts;
@@ -97,9 +99,9 @@ public class DynamicEndpointSnitchLongTest
         {
             while (!stopped)
             {
-                InetAddressAndPort host = hosts.get(random.nextInt(hosts.size()));
+                Replica host = hosts.get(random.nextInt(hosts.size()));
                 int score = random.nextInt(SCORE_RANGE);
-                dsnitch.receiveTiming(host, score);
+                dsnitch.receiveTiming(host.endpoint(), score);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index 01e67f0..e37045a 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -33,11 +33,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadataRef;
@@ -121,8 +120,8 @@ public class LongStreamingTest
             private String ks;
             public void init(String keyspace)
             {
-                for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
-                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
+                for (Replica range : StorageService.instance.getLocalReplicas(KS))
+                    addRangeForEndpoint(range.range(), FBUtilities.getBroadcastAddressAndPort());
 
                 this.ks = keyspace;
             }
@@ -148,8 +147,8 @@ public class LongStreamingTest
             private String ks;
             public void init(String keyspace)
             {
-                for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
-                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
+                for (Replica range : StorageService.instance.getLocalReplicas(KS))
+                    addRangeForEndpoint(range.range(), FBUtilities.getBroadcastAddressAndPort());
 
                 this.ks = keyspace;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
index 68cfd7e..73a2b71 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
@@ -21,18 +21,22 @@
 package org.apache.cassandra.test.microbench;
 
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.PendingRangeMaps;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.openjdk.jmh.annotations.*;
 import org.openjdk.jmh.infra.Blackhole;
 
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
@@ -50,7 +54,7 @@ public class PendingRangesBench
     PendingRangeMaps pendingRangeMaps;
     int maxToken = 256 * 100;
 
-    Multimap<Range<Token>, InetAddressAndPort> oldPendingRanges;
+    Multimap<Range<Token>, Replica> oldPendingRanges;
 
     private Range<Token> genRange(String left, String right)
     {
@@ -63,15 +67,17 @@ public class PendingRangesBench
         pendingRangeMaps = new PendingRangeMaps();
         oldPendingRanges = HashMultimap.create();
 
-        InetAddressAndPort[] addresses = { InetAddressAndPort.getByName("127.0.0.1"), InetAddressAndPort.getByName("127.0.0.2")};
+        List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"),
+                                                                InetAddressAndPort.getByName("127.0.0.2"));
 
         for (int i = 0; i < maxToken; i++)
         {
             for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
             {
                 Range<Token> range = genRange(Integer.toString(i * 10 + 5), Integer.toString(i * 10 + 15));
-                pendingRangeMaps.addPendingRange(range, addresses[j]);
-                oldPendingRanges.put(range, addresses[j]);
+                Replica replica = Replica.fullReplica(endpoints.get(j), range);
+                pendingRangeMaps.addPendingRange(range, replica);
+                oldPendingRanges.put(range, replica);
             }
         }
 
@@ -79,8 +85,9 @@ public class PendingRangesBench
         for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
         {
             Range<Token> range = genRange(Integer.toString(maxToken * 10 + 5), Integer.toString(5));
-            pendingRangeMaps.addPendingRange(range, addresses[j]);
-            oldPendingRanges.put(range, addresses[j]);
+            Replica replica = Replica.fullReplica(endpoints.get(j), range);
+            pendingRangeMaps.addPendingRange(range, replica);
+            oldPendingRanges.put(range, replica);
         }
     }
 
@@ -97,13 +104,13 @@ public class PendingRangesBench
     {
         int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
         Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
-        Set<InetAddressAndPort> endpoints = new HashSet<>();
-        for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : oldPendingRanges.asMap().entrySet())
+        Set<Replica> replicas = new HashSet<>();
+        for (Map.Entry<Range<Token>, Collection<Replica>> entry : oldPendingRanges.asMap().entrySet())
         {
             if (entry.getKey().contains(searchToken))
-                endpoints.addAll(entry.getValue());
+                replicas.addAll(entry.getValue());
         }
-        bh.consume(endpoints);
+        bh.consume(replicas);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 1201efa..bc2c19c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -32,9 +32,11 @@ import java.util.function.Supplier;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import org.apache.commons.lang3.StringUtils;
 
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +75,7 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class Util
@@ -446,6 +449,14 @@ public class Util
         }
     }
 
+    public static void consume(UnfilteredPartitionIterator iterator)
+    {
+        while (iterator.hasNext())
+        {
+            consume(iterator.next());
+        }
+    }
+
     public static int size(PartitionIterator iter)
     {
         int size = 0;
@@ -478,6 +489,15 @@ public class Util
             && Iterators.elementsEqual(a, b);
     }
 
+    public static boolean sameContent(RowIterator a, RowIterator b)
+    {
+        return Objects.equals(a.metadata(), b.metadata())
+               && Objects.equals(a.isReverseOrder(), b.isReverseOrder())
+               && Objects.equals(a.partitionKey(), b.partitionKey())
+               && Objects.equals(a.staticRow(), b.staticRow())
+               && Iterators.elementsEqual(a, b);
+    }
+
     public static boolean sameContent(Mutation a, Mutation b)
     {
         if (!a.key().equals(b.key()) || !a.getTableIds().equals(b.getTableIds()))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 07ab3dc..782e3b1 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -104,6 +104,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.io.util.DataOutputPlus",
     "org.apache.cassandra.io.util.DiskOptimizationStrategy",
     "org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy",
+    "org.apache.cassandra.locator.Replica",
     "org.apache.cassandra.locator.SimpleSeedProvider",
     "org.apache.cassandra.locator.SeedProvider",
     "org.apache.cassandra.net.BackPressureStrategy",
@@ -134,7 +135,9 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptionsCustomizer",
     "org.apache.cassandra.ConsoleAppenderBeanInfo",
     "org.apache.cassandra.ConsoleAppenderCustomizer",
-    "org.apache.cassandra.locator.InetAddressAndPort"
+    "org.apache.cassandra.locator.InetAddressAndPort",
+    "org.apache.cassandra.cql3.statements.schema.AlterKeyspaceStatement",
+    "org.apache.cassandra.cql3.statements.schema.CreateKeyspaceStatement"
     };
 
     static final Set<String> checkedClasses = new HashSet<>(Arrays.asList(validClasses));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 4a1a365..2fbbc28 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.functions.FunctionName;
@@ -148,7 +149,7 @@ public abstract class CQLTester
         {
             @Override public String getRack(InetAddressAndPort endpoint) { return RACK1; }
             @Override public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; }
-            @Override public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
+            @Override public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; }
         });
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index 184c5ad..37605d6 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.AbstractEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.schema.*;
@@ -527,7 +528,7 @@ public class CreateTest extends CQLTester
             public String getDatacenter(InetAddressAndPort endpoint) { return "us-east-1"; }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; }
         });
 
         // this forces the dc above to be added to the list of known datacenters (fixes static init problem


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org