You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/12/07 12:58:28 UTC

[2/2] cassandra git commit: Add option to optimize Merkle tree comparison across replicas

Add option to optimize Merkle tree comparison across replicas

Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-3200


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cb56d9fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cb56d9fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cb56d9fc

Branch: refs/heads/trunk
Commit: cb56d9fc3c773abbefa2044ce41ddbfb7717e0cb
Parents: a6f3983
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Dec 7 13:55:44 2017 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Dec 7 13:55:56 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   4 +
 .../repair/AsymmetricLocalSyncTask.java         | 105 +++++
 .../repair/AsymmetricRemoteSyncTask.java        |  60 +++
 .../cassandra/repair/AsymmetricSyncTask.java    |  85 ++++
 .../repair/CompletableRemoteSyncTask.java       |  28 ++
 .../apache/cassandra/repair/LocalSyncTask.java  |   1 -
 .../apache/cassandra/repair/RemoteSyncTask.java |   2 +-
 .../org/apache/cassandra/repair/RepairJob.java  | 137 ++++--
 .../repair/RepairMessageVerbHandler.java        |  24 +-
 .../apache/cassandra/repair/RepairRunnable.java |   1 +
 .../apache/cassandra/repair/RepairSession.java  |  12 +-
 .../cassandra/repair/StreamingRepairTask.java   |  36 +-
 .../repair/asymmetric/DifferenceHolder.java     |  98 +++++
 .../repair/asymmetric/HostDifferences.java      |  83 ++++
 .../asymmetric/IncomingRepairStreamTracker.java |  81 ++++
 .../repair/asymmetric/PreferedNodeFilter.java   |  27 ++
 .../repair/asymmetric/RangeDenormalizer.java    | 125 ++++++
 .../repair/asymmetric/ReduceHelper.java         | 137 ++++++
 .../repair/asymmetric/StreamFromOptions.java    | 109 +++++
 .../repair/messages/AsymmetricSyncRequest.java  | 132 ++++++
 .../repair/messages/RepairMessage.java          |   3 +-
 .../cassandra/repair/messages/RepairOption.java |  24 +-
 .../cassandra/service/ActiveRepairService.java  |   4 +-
 .../apache/cassandra/tools/nodetool/Repair.java |   6 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |   2 +-
 .../cassandra/repair/RepairSessionTest.java     |   2 +-
 .../repair/StreamingRepairTaskTest.java         |   5 +-
 .../repair/asymmetric/DifferenceHolderTest.java | 106 +++++
 .../asymmetric/RangeDenormalizerTest.java       |  86 ++++
 .../repair/asymmetric/ReduceHelperTest.java     | 425 +++++++++++++++++++
 .../asymmetric/StreamFromOptionsTest.java       | 124 ++++++
 .../apache/cassandra/utils/MerkleTreesTest.java |   2 +-
 33 files changed, 2013 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 34af97d..ef414b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200)
  * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081)
  * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
  * Support a means of logging all queries as they were invoked (CASSANDRA-13983)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index a14f7ba..43f57f2 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,10 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - An experimental option to compare all merkle trees together has been added - for example, in
+     a 3 node cluster with 2 replicas identical and 1 out-of-date, with this option enabled, the
+     out-of-date replica will only stream a single copy from up-to-date replica. Enable it by adding
+     "-os" to nodetool repair. See CASSANDRA-3200.
    - The currentTimestamp, currentDate, currentTime and currentTimeUUID functions have been added.
      See CASSANDRA-13132
    - Support for arithmetic operations between `timestamp`/`date` and `duration` has been added.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
new file mode 100644
index 0000000..5464520
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements StreamEventHandler
+{
+    private final UUID pendingRepair;
+    private final TraceState state = Tracing.instance.get();
+
+    public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind)
+    {
+        super(desc, FBUtilities.getBroadcastAddress(), fetchFrom, rangesToFetch, previewKind);
+        this.pendingRepair = pendingRepair;
+    }
+
+    public void startSync(List<Range<Token>> rangesToFetch)
+    {
+        InetAddress preferred = SystemKeyspace.getPreferredIP(fetchFrom);
+        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
+                                         1, false,
+                                         false,
+                                         pendingRepair,
+                                         previewKind)
+                          .listeners(this)
+                          .flushBeforeTransfer(pendingRepair == null)
+                          // request ranges from the remote node
+                          .requestRanges(fetchFrom, preferred, desc.keyspace, rangesToFetch, desc.columnFamily);
+        plan.execute();
+
+    }
+
+    public void handleStreamEvent(StreamEvent event)
+    {
+        if (state == null)
+            return;
+        switch (event.eventType)
+        {
+            case STREAM_PREPARED:
+                StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
+                state.trace("Streaming session with {} prepared", spe.session.peer);
+                break;
+            case STREAM_COMPLETE:
+                StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
+                state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
+                break;
+            case FILE_PROGRESS:
+                ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
+                state.trace("{}/{} ({}%) {} idx:{}{}",
+                            new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
+                                           FBUtilities.prettyPrintMemory(pi.totalBytes),
+                                           pi.currentBytes * 100 / pi.totalBytes,
+                                           pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
+                                           pi.sessionIndex,
+                                           pi.peer });
+        }
+    }
+
+    public void onSuccess(StreamState result)
+    {
+        String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, fetchingNode, fetchFrom, desc.columnFamily);
+        Tracing.traceRepair(message);
+        set(stat);
+        finished();
+    }
+
+    public void onFailure(Throwable t)
+    {
+        setException(t);
+        finished();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
new file mode 100644
index 0000000..d70975d
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionSummary;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask
+{
+    public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddress fetchNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+    {
+        super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
+    }
+
+    public void startSync(List<Range<Token>> rangesToFetch)
+    {
+        InetAddress local = FBUtilities.getBroadcastAddress();
+        AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind);
+        String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
+        Tracing.traceRepair(message);
+        MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode);
+    }
+    public void syncComplete(boolean success, List<SessionSummary> summaries)
+    {
+        if (success)
+        {
+            set(stat.withSummaries(summaries));
+        }
+        else
+        {
+            setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", fetchingNode, fetchFrom)));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
new file mode 100644
index 0000000..fe00058
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.tracing.Tracing;
+
+public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implements Runnable
+{
+    private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class);
+    protected final RepairJobDesc desc;
+    protected final InetAddress fetchFrom;
+    protected final List<Range<Token>> rangesToFetch;
+    protected final InetAddress fetchingNode;
+    protected final PreviewKind previewKind;
+    private long startTime = Long.MIN_VALUE;
+    protected volatile SyncStat stat;
+
+
+    public AsymmetricSyncTask(RepairJobDesc desc, InetAddress fetchingNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+    {
+        this.desc = desc;
+        this.fetchFrom = fetchFrom;
+        this.fetchingNode = fetchingNode;
+        this.rangesToFetch = rangesToFetch;
+        // todo: make an AsymmetricSyncStat?
+        stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size());
+        this.previewKind = previewKind;
+    }
+    public void run()
+    {
+        startTime = System.currentTimeMillis();
+        // choose a repair method based on the significance of the difference
+        String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), fetchingNode, fetchFrom, desc.columnFamily);
+        if (rangesToFetch.isEmpty())
+        {
+            logger.info(String.format(format, "are consistent"));
+            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", fetchingNode, fetchFrom, desc.columnFamily);
+            set(stat);
+            return;
+        }
+
+        // non-0 difference: perform streaming repair
+        logger.info(String.format(format, "have " + rangesToFetch.size() + " range(s) out of sync"));
+        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", fetchingNode, rangesToFetch.size(), fetchFrom, desc.columnFamily);
+        startSync(rangesToFetch);
+    }
+
+    protected void finished()
+    {
+        if (startTime != Long.MIN_VALUE)
+            Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+    }
+
+
+    public abstract void startSync(List<Range<Token>> rangesToFetch);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java
new file mode 100644
index 0000000..c4fe6c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+
+import org.apache.cassandra.streaming.SessionSummary;
+
+public interface CompletableRemoteSyncTask
+{
+    void syncComplete(boolean success, List<SessionSummary> summaries);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 343950b..8545b22 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -61,7 +61,6 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
         this.pullRepair = pullRepair;
     }
 
-
     @VisibleForTesting
     StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, List<Range<Token>> differences)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
index 6cc786e..93feb72 100644
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.utils.FBUtilities;
  *
  * When RemoteSyncTask receives SyncComplete from remote node, task completes.
  */
-public class RemoteSyncTask extends SyncTask
+public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
 {
     private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index d0654bd..7b8eb92 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
 import java.util.*;
+import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,7 +29,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.repair.asymmetric.DifferenceHolder;
+import org.apache.cassandra.repair.asymmetric.HostDifferences;
+import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
+import org.apache.cassandra.repair.asymmetric.ReduceHelper;
 import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -45,6 +53,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
     private final ListeningExecutorService taskExecutor;
     private final boolean isIncremental;
     private final PreviewKind previewKind;
+    private final boolean optimiseStreams;
 
     /**
      * Create repair job to run on specific columnfamily
@@ -52,7 +61,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
      * @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind)
+    public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind, boolean optimiseStreams)
     {
         this.session = session;
         this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
@@ -60,6 +69,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         this.parallelismDegree = session.parallelismDegree;
         this.isIncremental = isIncremental;
         this.previewKind = previewKind;
+        this.optimiseStreams = optimiseStreams;
     }
 
     /**
@@ -118,39 +128,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         }
 
         // When all validations complete, submit sync tasks
-        ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, new AsyncFunction<List<TreeResponse>, List<SyncStat>>()
-        {
-            public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> trees)
-            {
-                InetAddress local = FBUtilities.getLocalAddress();
-
-                List<SyncTask> syncTasks = new ArrayList<>();
-                // We need to difference all trees one against another
-                for (int i = 0; i < trees.size() - 1; ++i)
-                {
-                    TreeResponse r1 = trees.get(i);
-                    for (int j = i + 1; j < trees.size(); ++j)
-                    {
-                        TreeResponse r2 = trees.get(j);
-                        SyncTask task;
-                        if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
-                        {
-                            task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
-                        }
-                        else
-                        {
-                            task = new RemoteSyncTask(desc, r1, r2, session.previewKind);
-                            // RemoteSyncTask expects SyncComplete message sent back.
-                            // Register task to RepairSession to receive response.
-                            session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
-                        }
-                        syncTasks.add(task);
-                        taskExecutor.submit(task);
-                    }
-                }
-                return Futures.allAsList(syncTasks);
-            }
-        }, taskExecutor);
+        ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, optimiseStreams && !session.pullRepair ? optimisedSyncing() : standardSyncing(), taskExecutor);
 
         // When all sync complete, set the final result
         Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>()
@@ -182,6 +160,97 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         }, taskExecutor);
     }
 
+    private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing()
+    {
+        return trees ->
+        {
+            InetAddress local = FBUtilities.getLocalAddress();
+
+            List<SyncTask> syncTasks = new ArrayList<>();
+            // We need to difference all trees one against another
+            for (int i = 0; i < trees.size() - 1; ++i)
+            {
+                TreeResponse r1 = trees.get(i);
+                for (int j = i + 1; j < trees.size(); ++j)
+                {
+                    TreeResponse r2 = trees.get(j);
+                    SyncTask task;
+                    if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
+                    {
+                        task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
+                    }
+                    else
+                    {
+                        task = new RemoteSyncTask(desc, r1, r2, session.previewKind);
+                        // RemoteSyncTask expects SyncComplete message sent back.
+                        // Register task to RepairSession to receive response.
+                        session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
+                    }
+                    syncTasks.add(task);
+                    taskExecutor.submit(task);
+                }
+            }
+            return Futures.allAsList(syncTasks);
+        };
+    }
+
+    private AsyncFunction<List<TreeResponse>, List<SyncStat>> optimisedSyncing()
+    {
+        return trees ->
+        {
+            InetAddress local = FBUtilities.getLocalAddress();
+
+            List<AsymmetricSyncTask> syncTasks = new ArrayList<>();
+            // We need to difference all trees one against another
+            DifferenceHolder diffHolder = new DifferenceHolder(trees);
+
+            logger.debug("diffs = {}", diffHolder);
+            PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
+                                                    candidates.stream()
+                                                              .filter(node -> getDC(streaming)
+                                                                              .equals(getDC(node)))
+                                                              .collect(Collectors.toSet());
+            ImmutableMap<InetAddress, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
+
+            for (int i = 0; i < trees.size(); i++)
+            {
+                InetAddress address = trees.get(i).endpoint;
+                HostDifferences streamsFor = reducedDifferences.get(address);
+                if (streamsFor != null)
+                {
+                    assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves";
+                    for (InetAddress fetchFrom : streamsFor.hosts())
+                    {
+                        List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
+                        logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
+                        AsymmetricSyncTask task;
+                        if (address.equals(local))
+                        {
+                            task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind);
+                        }
+                        else
+                        {
+                            task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind);
+                            session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)),(AsymmetricRemoteSyncTask)task);
+                        }
+                        syncTasks.add(task);
+                        taskExecutor.submit(task);
+                    }
+                }
+                else
+                {
+                    logger.debug("Node {} has nothing to stream", address);
+                }
+            }
+            return Futures.allAsList(syncTasks);
+        };
+    }
+
+    private String getDC(InetAddress address)
+    {
+        return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address);
+    }
+
     /**
      * Creates {@link ValidationTask} and submit them to task executor in parallel.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 3c7f890..c26d4d1 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -144,10 +144,32 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     // forwarded sync request
                     SyncRequest request = (SyncRequest) message.payload;
                     logger.debug("Syncing {}", request);
-                    StreamingRepairTask task = new StreamingRepairTask(desc, request, isIncremental(desc.parentSessionId) ? desc.parentSessionId : null, request.previewKind);
+                    StreamingRepairTask task = new StreamingRepairTask(desc,
+                                                                       request.initiator,
+                                                                       request.src,
+                                                                       request.dst,
+                                                                       request.ranges,
+                                                                       isIncremental(desc.parentSessionId) ? desc.parentSessionId : null,
+                                                                       request.previewKind,
+                                                                       false);
                     task.run();
                     break;
 
+                case ASYMMETRIC_SYNC_REQUEST:
+                    // forwarded sync request
+                    AsymmetricSyncRequest asymmetricSyncRequest = (AsymmetricSyncRequest) message.payload;
+                    logger.debug("Syncing {}", asymmetricSyncRequest);
+                    StreamingRepairTask asymmetricTask = new StreamingRepairTask(desc,
+                                                                                 asymmetricSyncRequest.initiator,
+                                                                                 asymmetricSyncRequest.fetchingNode,
+                                                                                 asymmetricSyncRequest.fetchFrom,
+                                                                                 asymmetricSyncRequest.ranges,
+                                                                                 isIncremental(desc.parentSessionId) ? desc.parentSessionId : null,
+                                                                                 asymmetricSyncRequest.previewKind,
+                                                                                 true);
+                    asymmetricTask.run();
+                    break;
+
                 case CLEANUP:
                     logger.debug("cleaning up repair");
                     CleanupMessage cleanup = (CleanupMessage) message.payload;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 2b67a3c..1c9778b 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -515,6 +515,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                                                                      options.isPullRepair(),
                                                                                      force,
                                                                                      options.getPreviewKind(),
+                                                                                     options.optimiseStreams(),
                                                                                      executor,
                                                                                      cfnames);
             if (session == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 5dbd050..609ec56 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -104,10 +104,11 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
     private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, ValidationTask> validating = new ConcurrentHashMap<>();
     // Remote syncing jobs wait response in syncingTasks map
-    private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
 
     // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
     public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
+    private final boolean optimiseStreams;
 
     private volatile boolean terminated = false;
 
@@ -134,6 +135,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                          boolean pullRepair,
                          boolean force,
                          PreviewKind previewKind,
+                         boolean optimiseStreams,
                          String... cfnames)
     {
         assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
@@ -174,6 +176,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.previewKind = previewKind;
         this.pullRepair = pullRepair;
         this.skippedReplicas = forceSkippedReplicas;
+        this.optimiseStreams = optimiseStreams;
     }
 
     public UUID getId()
@@ -191,11 +194,12 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         validating.put(key, task);
     }
 
-    public void waitForSync(Pair<RepairJobDesc, NodePair> key, RemoteSyncTask task)
+    public void waitForSync(Pair<RepairJobDesc, NodePair> key, CompletableRemoteSyncTask task)
     {
         syncingTasks.put(key, task);
     }
 
+
     /**
      * Receive merkle tree response or failed response from {@code endpoint} for current repair job.
      *
@@ -227,7 +231,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
      */
     public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries)
     {
-        RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
+        CompletableRemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
         if (task == null)
         {
             assert terminated;
@@ -301,7 +305,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length);
         for (String cfname : cfnames)
         {
-            RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind);
+            RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind, optimiseStreams);
             executor.execute(job);
             jobs.add(job);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index f43010b..a1b7459 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -20,15 +20,17 @@ package org.apache.cassandra.repair;
 import java.net.InetAddress;
 import java.util.UUID;
 import java.util.Collections;
+import java.util.Collection;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
-import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
@@ -45,34 +47,44 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
     private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
 
     private final RepairJobDesc desc;
-    private final SyncRequest request;
+    private final boolean asymmetric;
+    private final InetAddress initiator;
+    private final InetAddress src;
+    private final InetAddress dst;
+    private final Collection<Range<Token>> ranges;
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
-    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID pendingRepair, PreviewKind previewKind)
+    public StreamingRepairTask(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges,  UUID pendingRepair, PreviewKind previewKind, boolean asymmetric)
     {
         this.desc = desc;
-        this.request = request;
+        this.initiator = initiator;
+        this.src = src;
+        this.dst = dst;
+        this.ranges = ranges;
+        this.asymmetric = asymmetric;
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
     }
 
     public void run()
     {
-        InetAddress dest = request.dst;
+        InetAddress dest = dst;
         InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
-        logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, request.ranges.size(), request.dst);
+        logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst);
         createStreamPlan(dest, preferred).execute();
     }
 
     @VisibleForTesting
     StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred)
     {
-        return new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
+        StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
                .listeners(this)
                .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary
-               .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // request ranges from the remote node
-               .transferRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily); // send ranges to the remote node
+               .requestRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node
+        if (!asymmetric)
+            sp.transferRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // send ranges to the remote node
+        return sp;
     }
 
     public void handleStreamEvent(StreamEvent event)
@@ -86,8 +98,8 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
      */
     public void onSuccess(StreamState state)
     {
-        logger.info("{} streaming task succeed, returning response to {}", previewKind.logPrefix(desc.sessionId), request.initiator);
-        MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, true, state.createSummaries()).createMessage(), request.initiator);
+        logger.info("[repair #{}] streaming task succeed, returning response to {}", desc.sessionId, initiator);
+        MessagingService.instance().sendOneWay(new SyncComplete(desc, src, dst, true, state.createSummaries()).createMessage(), initiator);
     }
 
     /**
@@ -95,6 +107,6 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
      */
     public void onFailure(Throwable t)
     {
-        MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, false, Collections.emptyList()).createMessage(), request.initiator);
+        MessagingService.instance().sendOneWay(new SyncComplete(desc, src, dst, false, Collections.emptyList()).createMessage(), initiator);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
new file mode 100644
index 0000000..eb99977
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.TreeResponse;
+import org.apache.cassandra.utils.MerkleTrees;
+
+/**
+ * Just holds all differences between the hosts involved in a repair
+ */
+public class DifferenceHolder
+{
+    private final ImmutableMap<InetAddress, HostDifferences> differences;
+
+    public DifferenceHolder(List<TreeResponse> trees)
+    {
+        ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder();
+        for (int i = 0; i < trees.size() - 1; ++i)
+        {
+            TreeResponse r1 = trees.get(i);
+            // create the differences between r1 and all other hosts:
+            HostDifferences hd = new HostDifferences();
+            for (int j = i + 1; j < trees.size(); ++j)
+            {
+                TreeResponse r2 = trees.get(j);
+                hd.add(r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees));
+            }
+            // and add them to the diff map
+            diffBuilder.put(r1.endpoint, hd);
+        }
+        differences = diffBuilder.build();
+    }
+
+    @VisibleForTesting
+    DifferenceHolder(Map<InetAddress, HostDifferences> differences)
+    {
+        ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder();
+        diffBuilder.putAll(differences);
+        this.differences = diffBuilder.build();
+    }
+
+    /**
+     * differences only holds one 'side' of the difference - if A and B mismatch, only A will be a key in the map
+     */
+    public Set<InetAddress> keyHosts()
+    {
+        return differences.keySet();
+    }
+
+    public HostDifferences get(InetAddress hostWithDifference)
+    {
+        return differences.get(hostWithDifference);
+    }
+
+    public String toString()
+    {
+        return "DifferenceHolder{" +
+               "differences=" + differences +
+               '}';
+    }
+
+    public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range)
+    {
+        HostDifferences diffsNode1 = differences.get(node1);
+        if (diffsNode1 != null && diffsNode1.hasDifferencesFor(node2, range))
+            return true;
+        HostDifferences diffsNode2 = differences.get(node2);
+        if (diffsNode2 != null && diffsNode2.hasDifferencesFor(node1, range))
+            return true;
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
new file mode 100644
index 0000000..6cbe987
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Tracks the differences for a single host
+ */
+public class HostDifferences
+{
+    private final Map<InetAddress, List<Range<Token>>> perHostDifferences = new HashMap<>();
+
+    /**
+     * Adds a set of differences between the node this instance is tracking and endpoint
+     */
+    public void add(InetAddress endpoint, List<Range<Token>> difference)
+    {
+        perHostDifferences.put(endpoint, difference);
+    }
+
+    public void addSingleRange(InetAddress remoteNode, Range<Token> rangeToFetch)
+    {
+        perHostDifferences.computeIfAbsent(remoteNode, (x) -> new ArrayList<>()).add(rangeToFetch);
+    }
+
+    /**
+     * Does this instance have differences for range with node2?
+     */
+    public boolean hasDifferencesFor(InetAddress node2, Range<Token> range)
+    {
+        List<Range<Token>> differences = get(node2);
+        for (Range<Token> diff : differences)
+        {
+            // if the other node has a diff for this range, we know they are not equal.
+            if (range.equals(diff) || range.intersects(diff))
+                return true;
+        }
+        return false;
+    }
+
+    public Set<InetAddress> hosts()
+    {
+        return perHostDifferences.keySet();
+    }
+
+    public List<Range<Token>> get(InetAddress differingHost)
+    {
+        return perHostDifferences.getOrDefault(differingHost, Collections.emptyList());
+    }
+
+    public String toString()
+    {
+        return "HostDifferences{" +
+               "perHostDifferences=" + perHostDifferences +
+               '}';
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
new file mode 100644
index 0000000..b41ddd8
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Tracks incoming streams for a single host
+ */
+public class IncomingRepairStreamTracker
+{
+    private static final Logger logger = LoggerFactory.getLogger(IncomingRepairStreamTracker.class);
+    private final DifferenceHolder differences;
+    private final Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+
+    public IncomingRepairStreamTracker(DifferenceHolder differences)
+    {
+        this.differences = differences;
+    }
+
+    public String toString()
+    {
+        return "IncomingStreamTracker{" +
+               "incoming=" + incoming +
+               '}';
+    }
+
+    /**
+     * Adds a range to be streamed from streamFromNode
+     *
+     * First the currently tracked ranges are denormalized to make sure that no ranges overlap, then
+     * the streamFromNode is added to the StreamFromOptions for the range
+     *
+     * @param range the range we need to stream from streamFromNode
+     * @param streamFromNode the node we should stream from
+     */
+    public void addIncomingRangeFrom(Range<Token> range, InetAddress streamFromNode)
+    {
+        logger.trace("adding incoming range {} from {}", range, streamFromNode);
+        Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range, incoming);
+        for (Range<Token> input : newInput)
+        {
+            incoming.computeIfAbsent(input, (newRange) -> new StreamFromOptions(differences, newRange)).add(streamFromNode);
+        }
+    }
+
+    public ImmutableMap<Range<Token>, StreamFromOptions> getIncoming()
+    {
+        return ImmutableMap.copyOf(incoming);
+    }
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
new file mode 100644
index 0000000..90788dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.Set;
+
+public interface PreferedNodeFilter
+{
+    public Set<InetAddress> apply(InetAddress streamingNode, Set<InetAddress> toStream);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
new file mode 100644
index 0000000..a04d6d5
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public class RangeDenormalizer
+{
+    private static final Logger logger = LoggerFactory.getLogger(IncomingRepairStreamTracker.class);
+
+    /**
+     * "Denormalizes" (kind of the opposite of what Range.normalize does) the ranges in the keys of {{incoming}}
+     *
+     * It makes sure that if there is an intersection between {{range}} and some of the ranges in {{incoming.keySet()}}
+     * we know that all intersections are keys in the updated {{incoming}}
+     */
+    public static Set<Range<Token>> denormalize(Range<Token> range, Map<Range<Token>, StreamFromOptions> incoming)
+    {
+        logger.trace("Denormalizing range={} incoming={}", range, incoming);
+        Set<Range<Token>> existingRanges = new HashSet<>(incoming.keySet());
+        Map<Range<Token>, StreamFromOptions> existingOverlappingRanges = new HashMap<>();
+        // remove all overlapping ranges from the incoming map
+        for (Range<Token> existingRange : existingRanges)
+        {
+            if (range.intersects(existingRange))
+                existingOverlappingRanges.put(existingRange, incoming.remove(existingRange));
+        }
+
+        Set<Range<Token>> intersections = intersection(existingRanges, range);
+        Set<Range<Token>> newExisting = Sets.union(subtractFromAllRanges(existingOverlappingRanges.keySet(), range), intersections);
+        Set<Range<Token>> newInput = Sets.union(range.subtractAll(existingOverlappingRanges.keySet()), intersections);
+        assertNonOverLapping(newExisting);
+        assertNonOverLapping(newInput);
+        for (Range<Token> r : newExisting)
+        {
+            for (Map.Entry<Range<Token>, StreamFromOptions> entry : existingOverlappingRanges.entrySet())
+            {
+                if (r.intersects(entry.getKey()))
+                    incoming.put(r, entry.getValue().copy(r));
+            }
+        }
+        logger.trace("denormalized {} to {}", range, newInput);
+        logger.trace("denormalized incoming to {}", incoming);
+        assertNonOverLapping(incoming.keySet());
+        return newInput;
+    }
+
+    /**
+     * Subtract the given range from all the input ranges.
+     *
+     * for example:
+     * ranges = [(0, 10], (20, 30]]
+     * and range = (8, 22]
+     *
+     * the result should be [(0, 8], (22, 30]]
+     *
+     */
+    @VisibleForTesting
+    static Set<Range<Token>> subtractFromAllRanges(Collection<Range<Token>> ranges, Range<Token> range)
+    {
+        Set<Range<Token>> result = new HashSet<>();
+        for (Range<Token> r : ranges)
+            result.addAll(r.subtract(range)); // subtract can return two ranges if we remove the middle part
+        return result;
+    }
+
+    /**
+     * Makes sure non of the input ranges are overlapping
+     */
+    private static void assertNonOverLapping(Set<Range<Token>> ranges)
+    {
+        List<Range<Token>> sortedRanges = Range.sort(ranges);
+        Token lastToken = null;
+        for (Range<Token> range : sortedRanges)
+        {
+            if (lastToken != null && lastToken.compareTo(range.left) > 0)
+            {
+                throw new AssertionError("Ranges are overlapping: "+ranges);
+            }
+            lastToken = range.right;
+        }
+    }
+
+    /**
+     * Returns all intersections between the ranges in ranges and the given range
+     */
+    private static Set<Range<Token>> intersection(Collection<Range<Token>> ranges, Range<Token> range)
+    {
+        Set<Range<Token>> result = new HashSet<>();
+        for (Range<Token> r : ranges)
+            result.addAll(range.intersectionWith(r));
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
new file mode 100644
index 0000000..ce05e93
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Basic idea is that we track incoming ranges instead of blindly just exchanging the ranges that mismatch between two nodes
+ *
+ * Say node X has tracked that it will stream range r1 from node Y. Now we see find a diffing range
+ * r1 between node X and Z. When adding r1 from Z as an incoming to X we check if Y and Z are equal on range r (ie, there is
+ * no difference between them). If they are equal X can stream from Y or Z and the end result will be the same.
+ *
+ * The ranges wont match perfectly since we don't iterate over leaves so we always split based on the
+ * smallest range (either the new difference or the existing one)
+ */
+public class ReduceHelper
+{
+    /**
+     * Reduces the differences provided by the merkle trees to a minimum set of differences
+     */
+    public static ImmutableMap<InetAddress, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
+    {
+        Map<InetAddress, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
+        Map<InetAddress, Integer> outgoingStreamCounts = new HashMap<>();
+        ImmutableMap.Builder<InetAddress, HostDifferences> mapBuilder = ImmutableMap.builder();
+        for (Map.Entry<InetAddress, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
+        {
+            IncomingRepairStreamTracker tracker = trackerEntry.getValue();
+            HostDifferences rangesToFetch = new HostDifferences();
+            for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet())
+            {
+                Range<Token> rangeToFetch = entry.getKey();
+                for (InetAddress remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter))
+                    rangesToFetch.addSingleRange(remoteNode, rangeToFetch);
+            }
+            mapBuilder.put(trackerEntry.getKey(), rangesToFetch);
+        }
+
+        return mapBuilder.build();
+    }
+
+    @VisibleForTesting
+    static Map<InetAddress, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences)
+    {
+        Map<InetAddress, IncomingRepairStreamTracker> trackers = new HashMap<>();
+
+        for (InetAddress hostWithDifference : differences.keyHosts())
+        {
+            HostDifferences hostDifferences = differences.get(hostWithDifference);
+            for (InetAddress differingHost : hostDifferences.hosts())
+            {
+                List<Range<Token>> differingRanges = hostDifferences.get(differingHost);
+                // hostWithDifference has mismatching ranges differingRanges with differingHost:
+                for (Range<Token> range : differingRanges)
+                {
+                    // a difference means that we need to sync that range between two nodes - add the diffing range to both
+                    // hosts:
+                    getTracker(differences, trackers, hostWithDifference).addIncomingRangeFrom(range, differingHost);
+                    getTracker(differences, trackers, differingHost).addIncomingRangeFrom(range, hostWithDifference);
+                }
+            }
+        }
+        return trackers;
+    }
+
+    private static IncomingRepairStreamTracker getTracker(DifferenceHolder differences,
+                                                          Map<InetAddress, IncomingRepairStreamTracker> trackers,
+                                                          InetAddress host)
+    {
+        return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences));
+    }
+
+    // greedily pick the nodes doing the least amount of streaming
+    private static Collection<InetAddress> pickLeastStreaming(InetAddress streamingNode,
+                                                              StreamFromOptions toStreamFrom,
+                                                              Map<InetAddress, Integer> outgoingStreamCounts,
+                                                              PreferedNodeFilter filter)
+    {
+        Set<InetAddress> retSet = new HashSet<>();
+        for (Set<InetAddress> toStream : toStreamFrom.allStreams())
+        {
+            InetAddress candidate = null;
+            Set<InetAddress> prefered = filter.apply(streamingNode, toStream);
+            for (InetAddress node : prefered)
+            {
+                if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
+                {
+                    candidate = node;
+                }
+            }
+            // ok, found no prefered hosts, try all of them
+            if (candidate == null)
+            {
+                for (InetAddress node : toStream)
+                {
+                    if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
+                    {
+                        candidate = node;
+                    }
+                }
+            }
+            assert candidate != null;
+            outgoingStreamCounts.put(candidate, outgoingStreamCounts.getOrDefault(candidate, 0) + 1);
+            retSet.add(candidate);
+        }
+        return retSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
new file mode 100644
index 0000000..4516f23
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Keeps track of where a node needs to stream a given range from.
+ *
+ * If the remote range is identical on several remote nodes, this class keeps track of them
+ *
+ * These stream from options get 'split' during denormalization - for example if we track range
+ * (100, 200] and we find a new differing range (180, 200] - then the denormalization will create two
+ * new StreamFromOptions (see copy below) with the same streamOptions, one with range (100, 180] and one with (180, 200] - then it
+ * adds the new incoming difference to the StreamFromOptions for the new range (180, 200].
+ */
+public class StreamFromOptions
+{
+    /**
+     * all differences - used to figure out if two nodes are equals on the range
+     */
+    private final DifferenceHolder differences;
+    /**
+     * The range to stream
+     */
+    @VisibleForTesting
+    final Range<Token> range;
+    /**
+     * Contains the hosts to stream from - if two nodes are in the same inner set, they are identical for the range we are handling
+     */
+    private final Set<Set<InetAddress>> streamOptions = new HashSet<>();
+
+    public StreamFromOptions(DifferenceHolder differences, Range<Token> range)
+    {
+        this(differences, range, Collections.emptySet());
+    }
+
+    private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddress>> existing)
+    {
+        this.differences = differences;
+        this.range = range;
+        for (Set<InetAddress> addresses : existing)
+            this.streamOptions.add(Sets.newHashSet(addresses));
+    }
+
+    /**
+     * Add new node to the stream options
+     *
+     * If we have no difference between the new node and a currently tracked on, we know they are matching over the
+     * range we are tracking, then just add it to the set with the identical remote nodes. Otherwise create a new group
+     * of nodes containing this new node.
+     */
+    public void add(InetAddress streamFromNode)
+    {
+        for (Set<InetAddress> options : streamOptions)
+        {
+            InetAddress first = options.iterator().next();
+            if (!differences.hasDifferenceBetween(first, streamFromNode, range))
+            {
+                options.add(streamFromNode);
+                return;
+            }
+        }
+        streamOptions.add(Sets.newHashSet(streamFromNode));
+    }
+
+    public StreamFromOptions copy(Range<Token> withRange)
+    {
+        return new StreamFromOptions(differences, withRange, streamOptions);
+    }
+
+    public Iterable<Set<InetAddress>> allStreams()
+    {
+        return streamOptions;
+    }
+
+    public String toString()
+    {
+        return "StreamFromOptions{" +
+               ", range=" + range +
+               ", streamOptions=" + streamOptions +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
new file mode 100644
index 0000000..b75ad7f
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.streaming.PreviewKind;
+
+public class AsymmetricSyncRequest extends RepairMessage
+{
+    public static MessageSerializer serializer = new SyncRequestSerializer();
+
+    public final InetAddress initiator;
+    public final InetAddress fetchingNode;
+    public final InetAddress fetchFrom;
+    public final Collection<Range<Token>> ranges;
+    public final PreviewKind previewKind;
+
+    public AsymmetricSyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress fetchingNode, InetAddress fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
+    {
+        super(Type.ASYMMETRIC_SYNC_REQUEST, desc);
+        this.initiator = initiator;
+        this.fetchingNode = fetchingNode;
+        this.fetchFrom = fetchFrom;
+        this.ranges = ranges;
+        this.previewKind = previewKind;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof AsymmetricSyncRequest))
+            return false;
+        AsymmetricSyncRequest req = (AsymmetricSyncRequest)o;
+        return messageType == req.messageType &&
+               desc.equals(req.desc) &&
+               initiator.equals(req.initiator) &&
+               fetchingNode.equals(req.fetchingNode) &&
+               fetchFrom.equals(req.fetchFrom) &&
+               ranges.equals(req.ranges);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(messageType, desc, initiator, fetchingNode, fetchFrom, ranges);
+    }
+
+    public static class SyncRequestSerializer implements MessageSerializer<AsymmetricSyncRequest>
+    {
+        public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException
+        {
+            RepairJobDesc.serializer.serialize(message.desc, out, version);
+            CompactEndpointSerializationHelper.serialize(message.initiator, out);
+            CompactEndpointSerializationHelper.serialize(message.fetchingNode, out);
+            CompactEndpointSerializationHelper.serialize(message.fetchFrom, out);
+            out.writeInt(message.ranges.size());
+            for (Range<Token> range : message.ranges)
+            {
+                MessagingService.validatePartitioner(range);
+                AbstractBounds.tokenSerializer.serialize(range, out, version);
+            }
+            out.writeInt(message.previewKind.getSerializationVal());
+        }
+
+        public AsymmetricSyncRequest deserialize(DataInputPlus in, int version) throws IOException
+        {
+            RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+            InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
+            int rangesCount = in.readInt();
+            List<Range<Token>> ranges = new ArrayList<>(rangesCount);
+            for (int i = 0; i < rangesCount; ++i)
+                ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version));
+            PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
+            return new AsymmetricSyncRequest(desc, owner, src, dst, ranges, previewKind);
+        }
+
+        public long serializedSize(AsymmetricSyncRequest message, int version)
+        {
+            long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+            size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
+            size += TypeSizes.sizeof(message.ranges.size());
+            for (Range<Token> range : message.ranges)
+                size += AbstractBounds.tokenSerializer.serializedSize(range, version);
+            size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
+            return size;
+        }
+    }
+
+    public String toString()
+    {
+        return "AsymmetricSyncRequest{" +
+               "initiator=" + initiator +
+               ", fetchingNode=" + fetchingNode +
+               ", fetchFrom=" + fetchFrom +
+               ", ranges=" + ranges +
+               ", previewKind=" + previewKind +
+               ", desc="+desc+
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index b72f139..09c6060 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -60,7 +60,8 @@ public abstract class RepairMessage
         FINALIZE_COMMIT(12, FinalizeCommit.serializer),
         FAILED_SESSION(13, FailSession.serializer),
         STATUS_REQUEST(14, StatusRequest.serializer),
-        STATUS_RESPONSE(15, StatusResponse.serializer);
+        STATUS_RESPONSE(15, StatusResponse.serializer),
+        ASYMMETRIC_SYNC_REQUEST(16, AsymmetricSyncRequest.serializer);
 
         private final byte type;
         private final MessageSerializer<RepairMessage> serializer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 971bf5d..adcd776 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -50,6 +50,7 @@ public class RepairOption
     public static final String PULL_REPAIR_KEY = "pullRepair";
     public static final String FORCE_REPAIR_KEY = "forceRepair";
     public static final String PREVIEW = "previewKind";
+    public static final String OPTIMISE_STREAMS_KEY = "optimiseStreams";
 
     // we don't want to push nodes too much for repair
     public static final int MAX_JOB_THREADS = 4;
@@ -131,6 +132,12 @@ public class RepairOption
      *             <td>"true" if the repair should continue, even if one of the replicas involved is down.
      *             <td>false</td>
      *         </tr>
+     *         <tr>
+     *             <td>optimiseStreams</td>
+     *             <td>"true" if we should try to optimise the syncing to avoid transfering identical
+     *             ranges to the same host multiple times</td>
+     *             <td>false</td>
+     *         </tr>
      *     </tbody>
      * </table>
      *
@@ -180,8 +187,9 @@ public class RepairOption
                 ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
             }
         }
+        boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY));
 
-        RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind);
+        RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing);
 
         // data centers
         String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -259,13 +267,14 @@ public class RepairOption
     private final boolean pullRepair;
     private final boolean forceRepair;
     private final PreviewKind previewKind;
+    private final boolean optimiseStreams;
 
     private final Collection<String> columnFamilies = new HashSet<>();
     private final Collection<String> dataCenters = new HashSet<>();
     private final Collection<String> hosts = new HashSet<>();
     private final Collection<Range<Token>> ranges = new HashSet<>();
 
-    public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind)
+    public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams)
     {
         if (FBUtilities.isWindows &&
             (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -286,6 +295,7 @@ public class RepairOption
         this.pullRepair = pullRepair;
         this.forceRepair = forceRepair;
         this.previewKind = previewKind;
+        this.optimiseStreams = optimiseStreams;
     }
 
     public RepairParallelism getParallelism()
@@ -363,10 +373,16 @@ public class RepairOption
         return previewKind.isPreview();
     }
 
-    public boolean isInLocalDCOnly() {
+    public boolean isInLocalDCOnly()
+    {
         return dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter());
     }
 
+    public boolean optimiseStreams()
+    {
+        return optimiseStreams;
+    }
+
     @Override
     public String toString()
     {
@@ -382,6 +398,7 @@ public class RepairOption
                ", # of ranges: " + ranges.size() +
                ", pull repair: " + pullRepair +
                ", force repair: " + forceRepair +
+               ", optimise streams: "+ optimiseStreams +
                ')';
     }
 
@@ -401,6 +418,7 @@ public class RepairOption
         options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair));
         options.put(FORCE_REPAIR_KEY, Boolean.toString(forceRepair));
         options.put(PREVIEW, previewKind.toString());
+        options.put(OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams));
         return options;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index ef3ffeb..0276238 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -213,6 +213,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              boolean pullRepair,
                                              boolean force,
                                              PreviewKind previewKind,
+                                             boolean optimiseStreams,
                                              ListeningExecutorService executor,
                                              String... cfnames)
     {
@@ -222,7 +223,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         if (cfnames.length == 0)
             return null;
 
-        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, cfnames);
+
+        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 86c29d4..8347afc 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -94,6 +94,10 @@ public class Repair extends NodeToolCmd
     @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.")
     private boolean pullRepair = false;
 
+    @Option(title = "optimise_streams", name = {"-os", "--optimise-streams"}, description = "Use --optimise-streams to try to reduce the number of streams we do (EXPERIMENTAL, see CASSANDRA-3200).")
+    private boolean optimiseStreams = false;
+
+
     private PreviewKind getPreviewKind()
     {
         if (validate)
@@ -144,7 +148,7 @@ public class Repair extends NodeToolCmd
             options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair));
             options.put(RepairOption.FORCE_REPAIR_KEY, Boolean.toString(force));
             options.put(RepairOption.PREVIEW, getPreviewKind().toString());
-
+            options.put(RepairOption.OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams));
             if (!startToken.isEmpty() || !endToken.isEmpty())
             {
                 options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index f5e9d6b..7f3dbff 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
@@ -46,7 +47,6 @@ import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.UUIDGen;
 
-import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
 import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
 
 import static org.junit.Assert.assertEquals;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org