You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:04 UTC

[12/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SimpleStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 545ad28..7a000b7 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 
@@ -36,34 +36,41 @@ import org.apache.cassandra.dht.Token;
  */
 public class SimpleStrategy extends AbstractReplicationStrategy
 {
+    private final ReplicationFactor rf;
+
     public SimpleStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
     {
         super(keyspaceName, tokenMetadata, snitch, configOptions);
+        this.rf = ReplicationFactor.fromString(this.configOptions.get("replication_factor"));
     }
 
-    public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+    public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
     {
-        int replicas = getReplicationFactor();
-        ArrayList<Token> tokens = metadata.sortedTokens();
-        List<InetAddressAndPort> endpoints = new ArrayList<InetAddressAndPort>(replicas);
+        ArrayList<Token> ring = metadata.sortedTokens();
+        if (ring.isEmpty())
+            return EndpointsForRange.empty(new Range<>(metadata.partitioner.getMinimumToken(), metadata.partitioner.getMinimumToken()));
+
+        Token replicaEnd = TokenMetadata.firstToken(ring, token);
+        Token replicaStart = metadata.getPredecessor(replicaEnd);
+        Range<Token> replicaRange = new Range<>(replicaStart, replicaEnd);
+        Iterator<Token> iter = TokenMetadata.ringIterator(ring, token, false);
 
-        if (tokens.isEmpty())
-            return endpoints;
+        EndpointsForRange.Builder replicas = EndpointsForRange.builder(replicaRange, rf.allReplicas);
 
         // Add the token at the index by default
-        Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
-        while (endpoints.size() < replicas && iter.hasNext())
+        while (replicas.size() < rf.allReplicas && iter.hasNext())
         {
-            InetAddressAndPort ep = metadata.getEndpoint(iter.next());
-            if (!endpoints.contains(ep))
-                endpoints.add(ep);
+            Token tk = iter.next();
+            InetAddressAndPort ep = metadata.getEndpoint(tk);
+            if (!replicas.containsEndpoint(ep))
+                replicas.add(new Replica(ep, replicaRange, replicas.size() < rf.fullReplicas));
         }
-        return endpoints;
+        return replicas.build();
     }
 
-    public int getReplicationFactor()
+    public ReplicationFactor getReplicationFactor()
     {
-        return Integer.parseInt(this.configOptions.get("replication_factor"));
+        return rf;
     }
 
     public void validateOptions() throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java
new file mode 100644
index 0000000..13a9d74
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public class SystemReplicas
+{
+    private static final Map<InetAddressAndPort, Replica> systemReplicas = new ConcurrentHashMap<>();
+    public static final Range<Token> FULL_RANGE = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
+                                                              DatabaseDescriptor.getPartitioner().getMinimumToken());
+
+    private static Replica createSystemReplica(InetAddressAndPort endpoint)
+    {
+        return new Replica(endpoint, FULL_RANGE, true);
+    }
+
+    /**
+     * There are a few places where a system function borrows write path functionality, but doesn't otherwise
+     * fit into normal replication strategies (ie: hints and batchlog). So here we provide a replica instance
+     * @param endpoint
+     * @return
+     */
+    public static Replica getSystemReplica(InetAddressAndPort endpoint)
+    {
+        return systemReplicas.computeIfAbsent(endpoint, SystemReplicas::createSystemReplica);
+    }
+
+    public static Collection<Replica> getSystemReplicas(Collection<InetAddressAndPort> endpoints)
+    {
+        List<Replica> replicas = new ArrayList<>(endpoints.size());
+        for (InetAddressAndPort endpoint: endpoints)
+        {
+            replicas.add(getSystemReplica(endpoint));
+        }
+        return replicas;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 46c191f..4ab34db 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,6 +88,7 @@ public class TokenMetadata
     // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
     private final Set<InetAddressAndPort> leavingEndpoints = new HashSet<>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
+    // NOTE: this may contain ranges that conflict with the those implied by sortedTokens when a range is changing its transient status
     private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>();
 
     // nodes which are migrating to the new tokens in the ring
@@ -733,24 +735,20 @@ public class TokenMetadata
         return sortedTokens;
     }
 
-    public Multimap<Range<Token>, InetAddressAndPort> getPendingRangesMM(String keyspaceName)
+    public EndpointsByRange getPendingRangesMM(String keyspaceName)
     {
-        Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create();
+        EndpointsByRange.Mutable byRange = new EndpointsByRange.Mutable();
         PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
 
         if (pendingRangeMaps != null)
         {
-            for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : pendingRangeMaps)
+            for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : pendingRangeMaps)
             {
-                Range<Token> range = entry.getKey();
-                for (InetAddressAndPort address : entry.getValue())
-                {
-                    map.put(range, address);
-                }
+                byRange.putAll(entry.getKey(), entry.getValue(), Conflict.ALL);
             }
         }
 
-        return map;
+        return byRange.asImmutableView();
     }
 
     /** a mutable map may be returned but caller should not modify it */
@@ -759,17 +757,18 @@ public class TokenMetadata
         return this.pendingRanges.get(keyspaceName);
     }
 
-    public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddressAndPort endpoint)
+    public RangesAtEndpoint getPendingRanges(String keyspaceName, InetAddressAndPort endpoint)
     {
-        List<Range<Token>> ranges = new ArrayList<>();
-        for (Map.Entry<Range<Token>, InetAddressAndPort> entry : getPendingRangesMM(keyspaceName).entries())
+        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
+        for (Map.Entry<Range<Token>, Replica> entry : getPendingRangesMM(keyspaceName).flattenEntries())
         {
-            if (entry.getValue().equals(endpoint))
+            Replica replica = entry.getValue();
+            if (replica.endpoint().equals(endpoint))
             {
-                ranges.add(entry.getKey());
+                builder.add(replica);
             }
         }
-        return ranges;
+        return builder.build();
     }
 
      /**
@@ -858,25 +857,27 @@ public class TokenMetadata
     {
         PendingRangeMaps newPendingRanges = new PendingRangeMaps();
 
-        Multimap<InetAddressAndPort, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
+        RangesByEndpoint addressRanges = strategy.getAddressReplicas(metadata);
 
         // Copy of metadata reflecting the situation after all leave operations are finished.
         TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
 
         // get all ranges that will be affected by leaving nodes
-        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+        Set<Range<Token>> removeAffectedRanges = new HashSet<>();
         for (InetAddressAndPort endpoint : leavingEndpoints)
-            affectedRanges.addAll(addressRanges.get(endpoint));
+            removeAffectedRanges.addAll(addressRanges.get(endpoint).ranges());
 
         // for each of those ranges, find what new nodes will be responsible for the range when
         // all leaving nodes are gone.
-        for (Range<Token> range : affectedRanges)
+        for (Range<Token> range : removeAffectedRanges)
         {
-            Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-            Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-            for (InetAddressAndPort address : Sets.difference(newEndpoints, currentEndpoints))
+            EndpointsForRange currentReplicas = strategy.calculateNaturalReplicas(range.right, metadata);
+            EndpointsForRange newReplicas = strategy.calculateNaturalReplicas(range.right, allLeftMetadata);
+            for (Replica replica : newReplicas)
             {
-                newPendingRanges.addPendingRange(range, address);
+                if (currentReplicas.endpoints().contains(replica.endpoint()))
+                    continue;
+                newPendingRanges.addPendingRange(range, replica);
             }
         }
 
@@ -891,9 +892,9 @@ public class TokenMetadata
             Collection<Token> tokens = bootstrapAddresses.get(endpoint);
 
             allLeftMetadata.updateNormalTokens(tokens, endpoint);
-            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint))
             {
-                newPendingRanges.addPendingRange(range, endpoint);
+                newPendingRanges.addPendingRange(replica.range(), replica);
             }
             allLeftMetadata.removeEndpoint(endpoint);
         }
@@ -906,38 +907,43 @@ public class TokenMetadata
         for (Pair<Token, InetAddressAndPort> moving : movingEndpoints)
         {
             //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
-            Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+            Set<Replica> moveAffectedReplicas = new HashSet<>();
             InetAddressAndPort endpoint = moving.right; // address of the moving node
             //Add ranges before the move
-            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint))
             {
-                moveAffectedRanges.add(range);
+                moveAffectedReplicas.add(replica);
             }
 
             allLeftMetadata.updateNormalToken(moving.left, endpoint);
             //Add ranges after the move
-            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint))
             {
-                moveAffectedRanges.add(range);
+                moveAffectedReplicas.add(replica);
             }
 
-            for(Range<Token> range : moveAffectedRanges)
+            for (Replica replica : moveAffectedReplicas)
             {
-                Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+                Set<InetAddressAndPort> currentEndpoints = strategy.calculateNaturalReplicas(replica.range().right, metadata).endpoints();
+                Set<InetAddressAndPort> newEndpoints = strategy.calculateNaturalReplicas(replica.range().right, allLeftMetadata).endpoints();
                 Set<InetAddressAndPort> difference = Sets.difference(newEndpoints, currentEndpoints);
-                for(final InetAddressAndPort address : difference)
+                for (final InetAddressAndPort address : difference)
                 {
-                    Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
-                    Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
-                    //We want to get rid of any ranges which the node is currently getting.
-                    newRanges.removeAll(oldRanges);
+                    RangesAtEndpoint newReplicas = strategy.getAddressReplicas(allLeftMetadata, address);
+                    RangesAtEndpoint oldReplicas = strategy.getAddressReplicas(metadata, address);
 
-                    for(Range<Token> newRange : newRanges)
+                    // Filter out the things that are already replicated
+                    newReplicas = newReplicas.filter(r -> !oldReplicas.contains(r));
+                    for (Replica newReplica : newReplicas)
                     {
-                        for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
+                        // for correctness on write, we need to treat ranges that are becoming full differently
+                        // to those that are presently transient; however reads must continue to use the current view
+                        // for ranges that are becoming transient. We could choose to ignore them here, but it's probably
+                        // cleaner to ensure this is dealt with at point of use, where we can make a conscious decision
+                        // about which to use
+                        for (Replica pendingReplica : newReplica.subtractSameReplication(oldReplicas))
                         {
-                            newPendingRanges.addPendingRange(pendingRange, address);
+                            newPendingRanges.addPendingRange(pendingReplica.range(), pendingReplica);
                         }
                     }
                 }
@@ -1206,11 +1212,11 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    public Collection<InetAddressAndPort> pendingEndpointsFor(Token token, String keyspaceName)
+    public EndpointsForToken pendingEndpointsForToken(Token token, String keyspaceName)
     {
         PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
         if (pendingRangeMaps == null)
-            return Collections.emptyList();
+            return EndpointsForToken.empty(token);
 
         return pendingRangeMaps.pendingEndpointsFor(token);
     }
@@ -1218,9 +1224,15 @@ public class TokenMetadata
     /**
      * @deprecated retained for benefit of old tests
      */
-    public Collection<InetAddressAndPort> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddressAndPort> naturalEndpoints)
+    public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural)
     {
-        return ImmutableList.copyOf(Iterables.concat(naturalEndpoints, pendingEndpointsFor(token, keyspaceName)));
+        EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName);
+        if (Endpoints.haveConflicts(natural, pending))
+        {
+            natural = Endpoints.resolveConflictsInNatural(natural, pending);
+            pending = Endpoints.resolveConflictsInPending(natural, pending);
+        }
+        return Endpoints.concat(natural, pending);
     }
 
     /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 9e8d542..5a90804 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -102,6 +102,8 @@ public class KeyspaceMetrics
     public final Counter speculativeFailedRetries;
     /** Needed to speculate, but didn't have enough replicas **/
     public final Counter speculativeInsufficientReplicas;
+    /** Needed to write to a transient replica to satisfy quorum **/
+    public final Counter speculativeWrites;
     /** Number of started repairs as coordinator on this keyspace */
     public final Counter repairsStarted;
     /** Number of completed repairs as coordinator on this keyspace */
@@ -268,41 +270,12 @@ public class KeyspaceMetrics
         writeFailedIdealCL = Metrics.counter(factory.createMetricName("WriteFailedIdealCL"));
         idealCLWriteLatency = new LatencyMetrics(factory, "IdealCLWrite");
 
-        speculativeRetries = createKeyspaceCounter("SpeculativeRetries", new MetricValue()
-        {
-            public Long getValue(TableMetrics metric)
-            {
-                return metric.speculativeRetries.getCount();
-            }
-        });
-        speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", new MetricValue()
-        {
-            public Long getValue(TableMetrics metric)
-            {
-                return metric.speculativeFailedRetries.getCount();
-            }
-        });
-        speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas", new MetricValue()
-        {
-            public Long getValue(TableMetrics metric)
-            {
-                return metric.speculativeInsufficientReplicas.getCount();
-            }
-        });
-        repairsStarted = createKeyspaceCounter("RepairJobsStarted", new MetricValue()
-        {
-            public Long getValue(TableMetrics metric)
-            {
-                return metric.repairsStarted.getCount();
-            }
-        });
-        repairsCompleted = createKeyspaceCounter("RepairJobsCompleted", new MetricValue()
-        {
-            public Long getValue(TableMetrics metric)
-            {
-                return metric.repairsCompleted.getCount();
-            }
-        });
+        speculativeRetries = createKeyspaceCounter("SpeculativeRetries", metric -> metric.speculativeRetries.getCount());
+        speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", metric -> metric.speculativeFailedRetries.getCount());
+        speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas", metric -> metric.speculativeInsufficientReplicas.getCount());
+        speculativeWrites = createKeyspaceCounter("SpeculativeWrites", metric -> metric.speculativeWrites.getCount());
+        repairsStarted = createKeyspaceCounter("RepairJobsStarted", metric -> metric.repairsStarted.getCount());
+        repairsCompleted = createKeyspaceCounter("RepairJobsCompleted", metric -> metric.repairsCompleted.getCount());
         repairTime = Metrics.timer(factory.createMetricName("RepairTime"));
         repairPrepareTime = Metrics.timer(factory.createMetricName("RepairPrepareTime"));
         anticompactionTime = Metrics.timer(factory.createMetricName("AntiCompactionTime"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
index fe7673d..3d00b12 100644
--- a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
@@ -37,6 +37,7 @@ public class ReadRepairMetrics
     @Deprecated
     public static final Meter attempted = Metrics.meter(factory.createMetricName("Attempted"));
 
+    // Incremented when additional requests were sent during blocking read repair due to unavailable or slow nodes
     public static final Meter speculatedRead = Metrics.meter(factory.createMetricName("SpeculatedRead"));
     public static final Meter speculatedWrite = Metrics.meter(factory.createMetricName("SpeculatedWrite"));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 49603ba..53ebcb0 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -214,6 +214,9 @@ public class TableMetrics
     public final Counter speculativeInsufficientReplicas;
     public final Gauge<Long> speculativeSampleLatencyNanos;
 
+    public final Counter speculativeWrites;
+    public final Gauge<Long> speculativeWriteLatencyNanos;
+
     public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -239,7 +242,7 @@ public class TableMetrics
             Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
             if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
                 continue;
-            if (k.getReplicationStrategy().getReplicationFactor() < 2)
+            if (k.getReplicationStrategy().getReplicationFactor().allReplicas < 2)
                 continue;
 
             for (ColumnFamilyStore cf : k.getColumnFamilyStores())
@@ -825,13 +828,11 @@ public class TableMetrics
         speculativeRetries = createTableCounter("SpeculativeRetries");
         speculativeFailedRetries = createTableCounter("SpeculativeFailedRetries");
         speculativeInsufficientReplicas = createTableCounter("SpeculativeInsufficientReplicas");
-        speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                return cfs.sampleLatencyNanos;
-            }
-        });
+        speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos", () -> cfs.sampleReadLatencyNanos);
+
+        speculativeWrites = createTableCounter("SpeculativeWrites");
+        speculativeWriteLatencyNanos = createTableGauge("SpeculativeWriteLatencyNanos", () -> cfs.transientWriteLatencyNanos);
+
         keyCacheHitRate = Metrics.register(factory.createMetricName("KeyCacheHitRate"),
                                            aliasFactory.createMetricName("KeyCacheHitRate"),
                                            new RatioGauge()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 251d263..253b412 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -21,6 +21,7 @@ import com.google.common.base.Predicate;
 
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 
 /**
  * implementors of IAsyncCallback need to make sure that any public methods
@@ -30,13 +31,9 @@ import org.apache.cassandra.locator.InetAddressAndPort;
  */
 public interface IAsyncCallback<T>
 {
-    Predicate<InetAddressAndPort> isAlive = new Predicate<InetAddressAndPort>()
-    {
-        public boolean apply(InetAddressAndPort endpoint)
-        {
-            return FailureDetector.instance.isAlive(endpoint);
-        }
-    };
+    final Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
+
+    final Predicate<Replica> isReplicaAlive = replica -> isAlive.apply(replica.endpoint());
 
     /**
      * @param msg response received.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c8fe3b7..bd290a1 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -90,6 +90,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
@@ -604,8 +605,9 @@ public final class MessagingService implements MessagingServiceMBean
 
                 if (expiredCallbackInfo.shouldHint())
                 {
-                    Mutation mutation = ((WriteCallbackInfo) expiredCallbackInfo).mutation();
-                    return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null);
+                    WriteCallbackInfo writeCallbackInfo = ((WriteCallbackInfo) expiredCallbackInfo);
+                    Mutation mutation = writeCallbackInfo.mutation();
+                    return StorageProxy.submitHint(mutation, writeCallbackInfo.getReplica(), null);
                 }
 
                 return null;
@@ -961,7 +963,7 @@ public final class MessagingService implements MessagingServiceMBean
         return verbHandlers.get(type);
     }
 
-    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddressAndPort to, long timeout, boolean failureCallback)
+    public int addWriteCallback(IAsyncCallback cb, MessageOut message, InetAddressAndPort to, long timeout, boolean failureCallback)
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
@@ -970,12 +972,12 @@ public final class MessagingService implements MessagingServiceMBean
         return messageId;
     }
 
-    public int addCallback(IAsyncCallback cb,
-                           MessageOut<?> message,
-                           InetAddressAndPort to,
-                           long timeout,
-                           ConsistencyLevel consistencyLevel,
-                           boolean allowHints)
+    public int addWriteCallback(IAsyncCallback cb,
+                                MessageOut<?> message,
+                                Replica to,
+                                long timeout,
+                                ConsistencyLevel consistencyLevel,
+                                boolean allowHints)
     {
         assert message.verb == Verb.MUTATION
             || message.verb == Verb.COUNTER_MUTATION
@@ -1024,7 +1026,7 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public int sendRR(MessageOut message, InetAddressAndPort to, IAsyncCallback cb, long timeout, boolean failureCallback)
     {
-        int id = addCallback(cb, message, to, timeout, failureCallback);
+        int id = addWriteCallback(cb, message, to, timeout, failureCallback);
         updateBackPressureOnSend(to, cb, message);
         sendOneWay(failureCallback ? message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE) : message, id, to);
         return id;
@@ -1042,14 +1044,14 @@ public final class MessagingService implements MessagingServiceMBean
      *                suggest that a timeout occurred to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
-    public int sendRR(MessageOut<?> message,
-                      InetAddressAndPort to,
-                      AbstractWriteResponseHandler<?> handler,
-                      boolean allowHints)
-    {
-        int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints);
-        updateBackPressureOnSend(to, handler, message);
-        sendOneWay(message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE), id, to);
+    public int sendWriteRR(MessageOut<?> message,
+                           Replica to,
+                           AbstractWriteResponseHandler<?> handler,
+                           boolean allowHints)
+    {
+        int id = addWriteCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel(), allowHints);
+        updateBackPressureOnSend(to.endpoint(), handler, message);
+        sendOneWay(message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE), id, to.endpoint());
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 41ac31b..c54e7dc 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.net;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.FBUtilities;
@@ -30,24 +30,31 @@ public class WriteCallbackInfo extends CallbackInfo
 {
     // either a Mutation, or a Paxos Commit (MessageOut)
     private final Object mutation;
+    private final Replica replica;
 
-    public WriteCallbackInfo(InetAddressAndPort target,
+    public WriteCallbackInfo(Replica replica,
                              IAsyncCallback callback,
                              MessageOut message,
                              IVersionedSerializer<?> serializer,
                              ConsistencyLevel consistencyLevel,
                              boolean allowHints)
     {
-        super(target, callback, serializer, true);
+        super(replica.endpoint(), callback, serializer, true);
         assert message != null;
         this.mutation = shouldHint(allowHints, message, consistencyLevel);
         //Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477)
         assert (!target.equals(FBUtilities.getBroadcastAddressAndPort()));
+        this.replica = replica;
     }
 
     public boolean shouldHint()
     {
-        return mutation != null && StorageProxy.shouldHint(target);
+        return mutation != null && StorageProxy.shouldHint(replica);
+    }
+
+    public Replica getReplica()
+    {
+        return replica;
     }
 
     public Mutation mutation()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
new file mode 100644
index 0000000..124baa1
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public abstract class AbstractSyncTask extends AbstractFuture<SyncStat> implements Runnable
+{
+    protected abstract void startSync(List<Range<Token>> rangesToStream);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
index 2ca524f..eaf890a 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -18,12 +18,14 @@
 
 package org.apache.cassandra.repair;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamEvent;
@@ -54,8 +56,9 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea
                                          previewKind)
                           .listeners(this)
                           .flushBeforeTransfer(pendingRepair == null)
-                          // request ranges from the remote node
-                          .requestRanges(fetchFrom, desc.keyspace, rangesToFetch, desc.columnFamily);
+                          // request ranges from the remote node, see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+                          .requestRanges(fetchFrom, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToFetch),
+                                  RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
         plan.execute();
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index e24d854..2b171c9 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
 
 public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask
 {
@@ -37,6 +38,10 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp
     {
         super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
     }
+    public AsymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse to, TreeResponse from, PreviewKind previewKind)
+    {
+        this(desc, to.endpoint, from.endpoint, MerkleTrees.difference(to.trees, from.trees), previewKind);
+    }
 
     public void startSync(List<Range<Token>> rangesToFetch)
     {
@@ -46,6 +51,7 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp
         Tracing.traceRepair(message);
         MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode);
     }
+
     public void syncComplete(boolean success, List<SessionSummary> summaries)
     {
         if (success)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
index 4d38e8a..35474af 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
@@ -21,8 +21,6 @@ package org.apache.cassandra.repair;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.util.concurrent.AbstractFuture;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +31,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
 
-public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implements Runnable
+public abstract class AsymmetricSyncTask extends AbstractSyncTask
 {
     private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class);
     protected final RepairJobDesc desc;
@@ -44,9 +42,9 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem
     private long startTime = Long.MIN_VALUE;
     protected volatile SyncStat stat;
 
-
     public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
     {
+        assert !fetchFrom.equals(fetchingNode) : "Fetching from self " + fetchFrom;
         this.desc = desc;
         this.fetchFrom = fetchFrom;
         this.fetchingNode = fetchingNode;
@@ -55,6 +53,7 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem
         stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size());
         this.previewKind = previewKind;
     }
+
     public void run()
     {
         startTime = System.currentTimeMillis();
@@ -79,7 +78,4 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem
         if (startTime != Long.MIN_VALUE)
             Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
     }
-
-
-    public abstract void startSync(List<Range<Token>> rangesToFetch);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/CommonRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java
new file mode 100644
index 0000000..928e570
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/CommonRange.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+/**
+ * Groups ranges with identical endpoints/transient endpoints
+ */
+public class CommonRange
+{
+    public final ImmutableSet<InetAddressAndPort> endpoints;
+    public final ImmutableSet<InetAddressAndPort> transEndpoints;
+    public final Collection<Range<Token>> ranges;
+
+    public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges)
+    {
+        Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty());
+        Preconditions.checkArgument(transEndpoints != null);
+        Preconditions.checkArgument(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
+        Preconditions.checkArgument(ranges != null && !ranges.isEmpty());
+
+        this.endpoints = ImmutableSet.copyOf(endpoints);
+        this.transEndpoints = ImmutableSet.copyOf(transEndpoints);
+        this.ranges = new ArrayList(ranges);
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        CommonRange that = (CommonRange) o;
+
+        if (!endpoints.equals(that.endpoints)) return false;
+        if (!transEndpoints.equals(that.transEndpoints)) return false;
+        return ranges.equals(that.ranges);
+    }
+
+    public int hashCode()
+    {
+        int result = endpoints.hashCode();
+        result = 31 * result + transEndpoints.hashCode();
+        result = 31 * result + ranges.hashCode();
+        return result;
+    }
+
+    public String toString()
+    {
+        return "CommonRange{" +
+               "endpoints=" + endpoints +
+               ", transEndpoints=" + transEndpoints +
+               ", ranges=" + ranges +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
index 8aa4381..bc614dc 100644
--- a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
+++ b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
@@ -25,8 +25,7 @@ import java.util.concurrent.ExecutorService;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 
 /**
  * Keyspace level hook for repair.
@@ -38,5 +37,8 @@ public interface KeyspaceRepairManager
      * been notified that the repair session has been completed, the data associated with the given session id must
      * not be combined with repaired or unrepaired data, or data from other repair sessions.
      */
-    ListenableFuture prepareIncrementalRepair(UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor);
+    ListenableFuture prepareIncrementalRepair(UUID sessionID,
+                                              Collection<ColumnFamilyStore> tables,
+                                              RangesAtEndpoint tokenRanges,
+                                              ExecutorService executor);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
deleted file mode 100644
index d7e0387..0000000
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.List;
-import java.util.UUID;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.streaming.StreamOperation;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * LocalSyncTask performs streaming between local(coordinator) node and remote replica.
- */
-public class LocalSyncTask extends SyncTask implements StreamEventHandler
-{
-    private final TraceState state = Tracing.instance.get();
-
-    private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
-
-    private final UUID pendingRepair;
-    private final boolean pullRepair;
-
-    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind)
-    {
-        super(desc, r1, r2, previewKind);
-        this.pendingRepair = pendingRepair;
-        this.pullRepair = pullRepair;
-    }
-
-    @VisibleForTesting
-    StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences)
-    {
-        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
-                          .listeners(this)
-                          .flushBeforeTransfer(pendingRepair == null)
-                          .requestRanges(dst, desc.keyspace, differences, desc.columnFamily);  // request ranges from the remote node
-        if (!pullRepair)
-        {
-            // send ranges to the remote node if we are not performing a pull repair
-            plan.transferRanges(dst, desc.keyspace, differences, desc.columnFamily);
-        }
-
-        return plan;
-    }
-
-    /**
-     * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
-     * that will be called out of band once the streams complete.
-     */
-    @Override
-    protected void startSync(List<Range<Token>> differences)
-    {
-        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-        // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
-        InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
-
-        String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
-        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
-        Tracing.traceRepair(message);
-
-        createStreamPlan(dst, differences).execute();
-    }
-
-    public void handleStreamEvent(StreamEvent event)
-    {
-        if (state == null)
-            return;
-        switch (event.eventType)
-        {
-            case STREAM_PREPARED:
-                StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
-                state.trace("Streaming session with {} prepared", spe.session.peer);
-                break;
-            case STREAM_COMPLETE:
-                StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
-                state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
-                break;
-            case FILE_PROGRESS:
-                ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
-                state.trace("{}/{} ({}%) {} idx:{}{}",
-                            new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
-                                           FBUtilities.prettyPrintMemory(pi.totalBytes),
-                                           pi.currentBytes * 100 / pi.totalBytes,
-                                           pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
-                                           pi.sessionIndex,
-                                           pi.peer });
-        }
-    }
-
-    public void onSuccess(StreamState result)
-    {
-        String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
-        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
-        Tracing.traceRepair(message);
-        set(stat.withSummaries(result.createSummaries()));
-        finished();
-    }
-
-    public void onFailure(Throwable t)
-    {
-        setException(t);
-        finished();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
deleted file mode 100644
index 0a47f73..0000000
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.RepairException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.SyncRequest;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.SessionSummary;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * RemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node
- * to repair(stream) data with other replica.
- *
- * When RemoteSyncTask receives SyncComplete from remote node, task completes.
- */
-public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
-{
-    private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class);
-
-    public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind)
-    {
-        super(desc, r1, r2, previewKind);
-    }
-
-    @Override
-    protected void startSync(List<Range<Token>> differences)
-    {
-        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-        SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind);
-        String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
-        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
-        Tracing.traceRepair(message);
-        MessagingService.instance().sendOneWay(request.createMessage(), request.src);
-    }
-
-    public void syncComplete(boolean success, List<SessionSummary> summaries)
-    {
-        if (success)
-        {
-            set(stat.withSummaries(summaries));
-        }
-        else
-        {
-            setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint)));
-        }
-        finished();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 48973d2..d38435b 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -64,7 +64,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
     public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind, boolean optimiseStreams)
     {
         this.session = session;
-        this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
+        this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.commonRange.ranges);
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
         this.isIncremental = isIncremental;
@@ -83,7 +83,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         Keyspace ks = Keyspace.open(desc.keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(desc.columnFamily);
         cfs.metric.repairsStarted.inc();
-        List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.endpoints);
+        List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.commonRange.endpoints);
         allEndpoints.add(FBUtilities.getBroadcastAddressAndPort());
 
         ListenableFuture<List<TreeResponse>> validations;
@@ -160,13 +160,18 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         }, taskExecutor);
     }
 
+    private boolean isTransient(InetAddressAndPort ep)
+    {
+        return session.commonRange.transEndpoints.contains(ep);
+    }
+
     private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing()
     {
         return trees ->
         {
             InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
 
-            List<SyncTask> syncTasks = new ArrayList<>();
+            List<AbstractSyncTask> syncTasks = new ArrayList<>();
             // We need to difference all trees one against another
             for (int i = 0; i < trees.size() - 1; ++i)
             {
@@ -174,17 +179,29 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                 for (int j = i + 1; j < trees.size(); ++j)
                 {
                     TreeResponse r2 = trees.get(j);
-                    SyncTask task;
+
+                    if (isTransient(r1.endpoint) && isTransient(r2.endpoint))
+                        continue;
+
+                    AbstractSyncTask task;
                     if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
                     {
-                        task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
+                        InetAddressAndPort remote = r1.endpoint.equals(local) ? r2.endpoint : r1.endpoint;
+                        task = new SymmetricLocalSyncTask(desc, r1, r2, isTransient(remote), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
+                    }
+                    else if (isTransient(r1.endpoint) || isTransient(r2.endpoint))
+                    {
+                        TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2;
+                        TreeResponse streamTo = isTransient(r1.endpoint) ? r2: r1;
+                        task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind);
+                        session.waitForSync(Pair.create(desc, new NodePair(streamTo.endpoint, streamFrom.endpoint)), (AsymmetricRemoteSyncTask) task);
                     }
                     else
                     {
-                        task = new RemoteSyncTask(desc, r1, r2, session.previewKind);
-                        // RemoteSyncTask expects SyncComplete message sent back.
+                        task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind);
+                        // SymmetricRemoteSyncTask expects SyncComplete message sent back.
                         // Register task to RepairSession to receive response.
-                        session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
+                        session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (SymmetricRemoteSyncTask) task);
                     }
                     syncTasks.add(task);
                     taskExecutor.submit(task);
@@ -200,7 +217,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         {
             InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
 
-            List<AsymmetricSyncTask> syncTasks = new ArrayList<>();
+            List<AbstractSyncTask> syncTasks = new ArrayList<>();
             // We need to difference all trees one against another
             DifferenceHolder diffHolder = new DifferenceHolder(trees);
 
@@ -215,6 +232,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             for (int i = 0; i < trees.size(); i++)
             {
                 InetAddressAndPort address = trees.get(i).endpoint;
+
+                // we don't stream to transient replicas
+                if (isTransient(address))
+                    continue;
+
                 HostDifferences streamsFor = reducedDifferences.get(address);
                 if (streamsFor != null)
                 {
@@ -373,4 +395,4 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         }
         return Futures.allAsList(tasks);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 90c0146..8d3cd54 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.repair;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
@@ -29,8 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableCollection;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -38,8 +36,9 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.*;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.Replica;
 import org.apache.commons.lang3.time.DurationFormatUtils;
-import org.junit.internal.runners.statements.Fail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +73,6 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -141,46 +139,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         recordFailure(message, completionMessage);
     }
 
-    @VisibleForTesting
-    static class CommonRange
-    {
-        public final Set<InetAddressAndPort> endpoints;
-        public final Collection<Range<Token>> ranges;
-
-        public CommonRange(Set<InetAddressAndPort> endpoints, Collection<Range<Token>> ranges)
-        {
-            Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty());
-            Preconditions.checkArgument(ranges != null && !ranges.isEmpty());
-            this.endpoints = endpoints;
-            this.ranges = ranges;
-        }
-
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            CommonRange that = (CommonRange) o;
-
-            if (!endpoints.equals(that.endpoints)) return false;
-            return ranges.equals(that.ranges);
-        }
-
-        public int hashCode()
-        {
-            int result = endpoints.hashCode();
-            result = 31 * result + ranges.hashCode();
-            return result;
-        }
-
-        public String toString()
-        {
-            return "CommonRange{" +
-                   "endpoints=" + endpoints +
-                   ", ranges=" + ranges +
-                   '}';
-        }
-    }
 
     protected void runMayThrow() throws Exception
     {
@@ -238,20 +196,24 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         Set<InetAddressAndPort> allNeighbors = new HashSet<>();
         List<CommonRange> commonRanges = new ArrayList<>();
 
-        //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+        //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
         //calculation multiple times
-        Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace);
+        // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica
+        Iterable<Range<Token>> keyspaceLocalRanges = storageService
+                .getLocalReplicas(keyspace)
+                .filter(Replica::isFull)
+                .ranges();
 
         try
         {
             for (Range<Token> range : options.getRanges())
             {
-                Set<InetAddressAndPort> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
-                                                                                     options.getDataCenters(),
-                                                                                     options.getHosts());
+                EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
+                                                                        options.getDataCenters(),
+                                                                        options.getHosts());
 
                 addRangeToNeighbors(commonRanges, range, neighbors);
-                allNeighbors.addAll(neighbors);
+                allNeighbors.addAll(neighbors.endpoints());
             }
 
             progress.incrementAndGet();
@@ -387,11 +349,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
             for (CommonRange commonRange: commonRanges)
             {
                 Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
+                Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
+                Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
 
                 // this node is implicitly a participant in this repair, so a single endpoint is ok here
                 if (!endpoints.isEmpty())
                 {
-                    filtered.add(new CommonRange(endpoints, commonRange.ranges));
+                    filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges));
                 }
             }
             Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
@@ -514,14 +478,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         // we do endpoint filtering at the start of an incremental repair,
         // so repair sessions shouldn't also be checking liveness
         boolean force = options.isForcedRepair() && !isIncremental;
-        for (CommonRange cr : commonRanges)
+        for (CommonRange commonRange : commonRanges)
         {
-            logger.info("Starting RepairSession for {}", cr);
+            logger.info("Starting RepairSession for {}", commonRange);
             RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
-                                                                                     cr.ranges,
+                                                                                     commonRange,
                                                                                      keyspace,
                                                                                      options.getParallelism(),
-                                                                                     cr.endpoints,
                                                                                      isIncremental,
                                                                                      options.isPullRepair(),
                                                                                      force,
@@ -559,7 +522,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         public void onSuccess(RepairSessionResult result)
         {
             String message = String.format("Repair session %s for range %s finished", session.getId(),
-                                           session.getRanges().toString());
+                                           session.ranges().toString());
             logger.info(message);
             fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
                                                 progress.incrementAndGet(),
@@ -572,7 +535,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
             StorageMetrics.repairExceptions.inc();
 
             String message = String.format("Repair session %s for range %s failed with error %s",
-                                           session.getId(), session.getRanges().toString(), t.getMessage());
+                                           session.getId(), session.ranges().toString(), t.getMessage());
             logger.error(message, t);
             fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR,
                                                 progress.incrementAndGet(),
@@ -684,13 +647,15 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                                ImmutableList.of(failureMessage, completionMessage));
     }
 
-    private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddressAndPort> neighbors)
+    private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
     {
+        Set<InetAddressAndPort> endpoints = neighbors.endpoints();
+        Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints();
         for (int i = 0; i < neighborRangeList.size(); i++)
         {
             CommonRange cr = neighborRangeList.get(i);
 
-            if (cr.endpoints.containsAll(neighbors))
+            if (cr.endpoints.containsAll(endpoints) && cr.transEndpoints.containsAll(transEndpoints))
             {
                 cr.ranges.add(range);
                 return;
@@ -699,7 +664,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
         List<Range<Token>> ranges = new ArrayList<>();
         ranges.add(range);
-        neighborRangeList.add(new CommonRange(neighbors, ranges));
+        neighborRangeList.add(new CommonRange(endpoints, transEndpoints, ranges));
     }
 
     private Thread createQueryThread(final int cmd, final UUID sessionId)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index ec06f37..2ff60ec 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -55,8 +55,8 @@ import org.apache.cassandra.utils.Pair;
  *      validationComplete()).
  *   </li>
  *   <li>Synchronization phase: once all trees are received, the job compares each tree with
- *      all the other using a so-called {@link SyncTask}. If there is difference between 2 trees, the
- *      concerned SyncTask will start a streaming of the difference between the 2 endpoint concerned.
+ *      all the other using a so-called {@link SymmetricSyncTask}. If there is difference between 2 trees, the
+ *      concerned SymmetricSyncTask will start a streaming of the difference between the 2 endpoint concerned.
  *   </li>
  * </ol>
  * The job is done once all its SyncTasks are done (i.e. have either computed no differences
@@ -74,7 +74,7 @@ import org.apache.cassandra.utils.Pair;
  * we still first send a message to each node to flush and snapshot data so each merkle tree
  * creation is still done on similar data, even if the actual creation is not
  * done simulatneously). If not sequential, all merkle tree are requested in parallel.
- * Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle
+ * Similarly, if a job is sequential, it will handle one SymmetricSyncTask at a time, but will handle
  * all of them in parallel otherwise.
  */
 public class RepairSession extends AbstractFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber,
@@ -94,8 +94,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     public final boolean skippedReplicas;
 
     /** Range to repair */
-    public final Collection<Range<Token>> ranges;
-    public final Set<InetAddressAndPort> endpoints;
+    public final CommonRange commonRange;
     public final boolean isIncremental;
     public final PreviewKind previewKind;
 
@@ -114,23 +113,20 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
 
     /**
      * Create new repair session.
-     *
      * @param parentRepairSession the parent sessions id
      * @param id this sessions id
-     * @param ranges ranges to repair
+     * @param commonRange ranges to repair
      * @param keyspace name of keyspace
      * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
-     * @param endpoints the data centers that should be part of the repair; null for all DCs
      * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
      * @param force true if the repair should ignore dead endpoints (instead of failing)
      * @param cfnames names of columnfamilies
      */
     public RepairSession(UUID parentRepairSession,
                          UUID id,
-                         Collection<Range<Token>> ranges,
+                         CommonRange commonRange,
                          String keyspace,
                          RepairParallelism parallelismDegree,
-                         Set<InetAddressAndPort> endpoints,
                          boolean isIncremental,
                          boolean pullRepair,
                          boolean force,
@@ -145,7 +141,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.parallelismDegree = parallelismDegree;
         this.keyspace = keyspace;
         this.cfnames = cfnames;
-        this.ranges = ranges;
 
         //If force then filter out dead endpoints
         boolean forceSkippedReplicas = false;
@@ -153,7 +148,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         {
             logger.debug("force flag set, removing dead endpoints");
             final Set<InetAddressAndPort> removeCandidates = new HashSet<>();
-            for (final InetAddressAndPort endpoint : endpoints)
+            for (final InetAddressAndPort endpoint : commonRange.endpoints)
             {
                 if (!FailureDetector.instance.isAlive(endpoint))
                 {
@@ -166,12 +161,13 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                 // we shouldn't be recording a successful repair if
                 // any replicas are excluded from the repair
                 forceSkippedReplicas = true;
-                endpoints = new HashSet<>(endpoints);
-                endpoints.removeAll(removeCandidates);
+                Set<InetAddressAndPort> filteredEndpoints = new HashSet<>(commonRange.endpoints);
+                filteredEndpoints.removeAll(removeCandidates);
+                commonRange = new CommonRange(filteredEndpoints, commonRange.transEndpoints, commonRange.ranges);
             }
         }
 
-        this.endpoints = endpoints;
+        this.commonRange = commonRange;
         this.isIncremental = isIncremental;
         this.previewKind = previewKind;
         this.pullRepair = pullRepair;
@@ -184,9 +180,14 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         return id;
     }
 
-    public Collection<Range<Token>> getRanges()
+    public Collection<Range<Token>> ranges()
+    {
+        return commonRange.ranges;
+    }
+
+    public Collection<InetAddressAndPort> endpoints()
     {
-        return ranges;
+        return commonRange.endpoints;
     }
 
     public void waitForValidation(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task)
@@ -247,7 +248,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     {
         StringBuilder sb = new StringBuilder();
         sb.append(FBUtilities.getBroadcastAddressAndPort());
-        for (InetAddressAndPort ep : endpoints)
+        for (InetAddressAndPort ep : commonRange.endpoints)
             sb.append(", ").append(ep);
         return sb.toString();
     }
@@ -266,18 +267,18 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         if (terminated)
             return;
 
-        logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames));
-        Tracing.traceRepair("Syncing range {}", ranges);
+        logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), commonRange, keyspace, Arrays.toString(cfnames));
+        Tracing.traceRepair("Syncing range {}", commonRange);
         if (!previewKind.isPreview())
         {
-            SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, ranges, endpoints);
+            SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, commonRange);
         }
 
-        if (endpoints.isEmpty())
+        if (commonRange.endpoints.isEmpty())
         {
-            logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", ranges));
+            logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", commonRange));
             Tracing.traceRepair(message);
-            set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList(), skippedReplicas));
+            set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), skippedReplicas));
             if (!previewKind.isPreview())
             {
                 SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message));
@@ -286,7 +287,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         }
 
         // Checking all nodes are live
-        for (InetAddressAndPort endpoint : endpoints)
+        for (InetAddressAndPort endpoint : commonRange.endpoints)
         {
             if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas)
             {
@@ -318,8 +319,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
             {
                 // this repair session is completed
                 logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully");
-                Tracing.traceRepair("Completed sync of range {}", ranges);
-                set(new RepairSessionResult(id, keyspace, ranges, results, skippedReplicas));
+                Tracing.traceRepair("Completed sync of range {}", commonRange);
+                set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, skippedReplicas));
 
                 taskExecutor.shutdown();
                 // mark this session as terminated
@@ -372,7 +373,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
 
     public void convict(InetAddressAndPort endpoint, double phi)
     {
-        if (!endpoints.contains(endpoint))
+        if (!commonRange.endpoints.contains(endpoint))
             return;
 
         // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 5d2b2ec..e9cba89 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Collection;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,9 +79,12 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
         StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
                .listeners(this)
                .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary
-               .requestRanges(dest, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node
+               // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+               .requestRanges(dest, desc.keyspace, RangesAtEndpoint.toDummyList(ranges),
+                       RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node
         if (!asymmetric)
-            sp.transferRanges(dest, desc.keyspace, ranges, desc.columnFamily); // send ranges to the remote node
+            // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+            sp.transferRanges(dest, desc.keyspace, RangesAtEndpoint.toDummyList(ranges), desc.columnFamily); // send ranges to the remote node
         return sp;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org