You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/12/01 11:42:17 UTC

[cassandra] branch trunk updated: Optimised repair streaming improvements

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22abff7  Optimised repair streaming improvements
22abff7 is described below

commit 22abff779df097e0ef38180442e9c680b3d41187
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Fri Nov 13 09:47:38 2020 +0100

    Optimised repair streaming improvements
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-16274
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../cassandra/config/DatabaseDescriptor.java       |  38 +++
 src/java/org/apache/cassandra/net/Verb.java        |   2 -
 .../cassandra/repair/AsymmetricRemoteSyncTask.java |  12 +-
 .../org/apache/cassandra/repair/RepairJob.java     |   3 +-
 .../cassandra/repair/RepairMessageVerbHandler.java |  17 +-
 .../cassandra/repair/StreamingRepairTask.java      |   2 +-
 .../cassandra/repair/SymmetricRemoteSyncTask.java  |   2 +-
 .../repair/asymmetric/HostDifferences.java         |  45 +++-
 .../asymmetric/IncomingRepairStreamTracker.java    |   6 +-
 .../repair/asymmetric/RangeDenormalizer.java       |  74 +++---
 .../cassandra/repair/asymmetric/RangeMap.java      | 269 +++++++++++++++++++++
 .../cassandra/repair/asymmetric/ReduceHelper.java  |  67 ++---
 .../repair/messages/AsymmetricSyncRequest.java     | 133 ----------
 .../cassandra/repair/messages/RepairOption.java    |  19 +-
 .../cassandra/repair/messages/SyncRequest.java     |  19 +-
 .../apache/cassandra/service/StorageService.java   |  30 +++
 .../cassandra/service/StorageServiceMBean.java     |   8 +
 .../data/serialization/4.0/service.SyncRequest.bin | Bin 110 -> 111 bytes
 .../test/OptimiseStreamsRepairTest.java            | 195 +++++++++++++++
 .../org/apache/cassandra/repair/RepairJobTest.java |  55 +++--
 .../cassandra/repair/StreamingRepairTaskTest.java  |   4 +-
 .../repair/asymmetric/RangeDenormalizerTest.java   |   8 +-
 .../cassandra/repair/asymmetric/RangeMapTest.java  | 106 ++++++++
 .../repair/asymmetric/ReduceHelperTest.java        |  44 +++-
 .../messages/RepairMessageSerializationsTest.java  |   2 +-
 .../cassandra/service/SerializationsTest.java      |   3 +-
 28 files changed, 872 insertions(+), 297 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 56843d7..8e19522 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta4
+ * Optimised repair streaming improvements (CASSANDRA-16274)
  * Update jctools dependency to 3.1.0 (CASSANDRA-16255)
  * 'SSLEngine closed already' exception on failed outbound connection (CASSANDRA-16277)
  * Drain and/or shutdown might throw because of slow messaging service shutdown (CASSANDRA-16276)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 464f8ad..fbc7f98 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -533,6 +533,11 @@ public class Config
 
     public boolean autocompaction_on_startup_enabled = Boolean.parseBoolean(System.getProperty("cassandra.autocompaction_on_startup_enabled", "true"));
 
+    // see CASSANDRA-3200 / CASSANDRA-16274
+    public volatile boolean auto_optimise_inc_repair_streams = false;
+    public volatile boolean auto_optimise_full_repair_streams = false;
+    public volatile boolean auto_optimise_preview_repair_streams = false;
+
     /**
      * Client mode means that the process is a pure client, that uses C* code base but does
      * not read or write local C* database files.
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 30a30e5..d559d63 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3222,4 +3222,42 @@ public class DatabaseDescriptor
     {
         return conf.autocompaction_on_startup_enabled;
     }
+
+    public static boolean autoOptimiseIncRepairStreams()
+    {
+        return conf.auto_optimise_inc_repair_streams;
+    }
+
+    public static void setAutoOptimiseIncRepairStreams(boolean enabled)
+    {
+        if (enabled != conf.auto_optimise_inc_repair_streams)
+            logger.info("Changing auto_optimise_inc_repair_streams from {} to {}", conf.auto_optimise_inc_repair_streams, enabled);
+        conf.auto_optimise_inc_repair_streams = enabled;
+    }
+
+    public static boolean autoOptimiseFullRepairStreams()
+    {
+        return conf.auto_optimise_full_repair_streams;
+    }
+
+    public static void setAutoOptimiseFullRepairStreams(boolean enabled)
+    {
+        if (enabled != conf.auto_optimise_full_repair_streams)
+            logger.info("Changing auto_optimise_full_repair_streams from {} to {}", conf.auto_optimise_full_repair_streams, enabled);
+        conf.auto_optimise_full_repair_streams = enabled;
+    }
+
+    public static boolean autoOptimisePreviewRepairStreams()
+    {
+        return conf.auto_optimise_preview_repair_streams;
+    }
+
+    public static void setAutoOptimisePreviewRepairStreams(boolean enabled)
+    {
+        if (enabled != conf.auto_optimise_preview_repair_streams)
+            logger.info("Changing auto_optimise_preview_repair_streams from {} to {}", conf.auto_optimise_preview_repair_streams, enabled);
+        conf.auto_optimise_preview_repair_streams = enabled;
+    }
+
+
 }
diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java
index 2ef981d..fad2fbf 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -56,7 +56,6 @@ import org.apache.cassandra.hints.HintMessage;
 import org.apache.cassandra.hints.HintVerbHandler;
 import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
 import org.apache.cassandra.repair.RepairMessageVerbHandler;
-import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
 import org.apache.cassandra.repair.messages.CleanupMessage;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
@@ -164,7 +163,6 @@ public enum Verb
     FAILED_SESSION_MSG     (113, P1, rpcTimeout,      ANTI_ENTROPY,      () -> FailSession.serializer,               () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
     STATUS_RSP             (115, P1, rpcTimeout,      ANTI_ENTROPY,      () -> StatusResponse.serializer,            () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
     STATUS_REQ             (114, P1, rpcTimeout,      ANTI_ENTROPY,      () -> StatusRequest.serializer,             () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
-    ASYMMETRIC_SYNC_REQ    (116, P1, rpcTimeout,      ANTI_ENTROPY,      () -> AsymmetricSyncRequest.serializer,     () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 
     REPLICATION_DONE_RSP   (82,  P0, rpcTimeout,      MISC,              () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
     REPLICATION_DONE_REQ   (22,  P0, rpcTimeout,      MISC,              () -> NoPayload.serializer,                 () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index cf6d84b..40a1f51 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -26,16 +26,16 @@ import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.net.Verb.ASYMMETRIC_SYNC_REQ;
+import static org.apache.cassandra.net.Verb.SYNC_REQ;
 
 /**
- * AsymmetricRemoteSyncTask sends {@link AsymmetricSyncRequest} to target node to repair(stream)
+ * AsymmetricRemoteSyncTask sends {@link SyncRequest} to target node to repair(stream)
  * data with other target replica.
  *
  * When AsymmetricRemoteSyncTask receives SyncComplete from the target, task completes.
@@ -50,10 +50,10 @@ public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRem
     public void startSync()
     {
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-        AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
-        String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
+        SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, true);
+        String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
         Tracing.traceRepair(message);
-        MessagingService.instance().send(Message.out(ASYMMETRIC_SYNC_REQ, request), request.fetchingNode);
+        MessagingService.instance().send(Message.out(SYNC_REQ, request), request.src);
     }
 
     public void syncComplete(boolean success, List<SessionSummary> summaries)
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 341b0fd..825d659 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -322,7 +322,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                 Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not fetch ranges from ourselves");
                 for (InetAddressAndPort fetchFrom : streamsFor.hosts())
                 {
-                    List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
+                    List<Range<Token>> toFetch = new ArrayList<>(streamsFor.get(fetchFrom));
                     assert !toFetch.isEmpty();
 
                     logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
@@ -347,6 +347,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         }
         logger.info("Created {} optimised sync tasks based on {} merkle tree responses for {} (took: {}ms)",
                     syncTasks.size(), trees.size(), desc.parentSessionId, System.currentTimeMillis() - startedAt);
+        logger.debug("Optimised sync tasks for {}: {}", desc.parentSessionId, syncTasks);
         return syncTasks;
     }
 
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 51518cb..2bf6d84 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -152,25 +152,10 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                                                                        request.ranges,
                                                                        isIncremental(desc.parentSessionId) ? desc.parentSessionId : null,
                                                                        request.previewKind,
-                                                                       false);
+                                                                       request.asymmetric);
                     task.run();
                     break;
 
-                case ASYMMETRIC_SYNC_REQ:
-                    // forwarded sync request
-                    AsymmetricSyncRequest asymmetricSyncRequest = (AsymmetricSyncRequest) message.payload;
-                    logger.debug("Syncing {}", asymmetricSyncRequest);
-                    StreamingRepairTask asymmetricTask = new StreamingRepairTask(desc,
-                                                                                 asymmetricSyncRequest.initiator,
-                                                                                 asymmetricSyncRequest.fetchingNode,
-                                                                                 asymmetricSyncRequest.fetchFrom,
-                                                                                 asymmetricSyncRequest.ranges,
-                                                                                 isIncremental(desc.parentSessionId) ? desc.parentSessionId : null,
-                                                                                 asymmetricSyncRequest.previewKind,
-                                                                                 true);
-                    asymmetricTask.run();
-                    break;
-
                 case CLEANUP_MSG:
                     logger.debug("cleaning up repair");
                     CleanupMessage cleanup = (CleanupMessage) message.payload;
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 827dce3..fbfbac8 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -72,7 +72,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
 
     public void run()
     {
-        logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst);
+        logger.info("[streaming task #{}] Performing {}streaming repair of {} ranges with {}", desc.sessionId, asymmetric ? "asymmetric " : "", ranges.size(), dst);
         createStreamPlan(dst).execute();
     }
 
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index 181554a..b4e2d9c 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -63,7 +63,7 @@ public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemo
     {
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
 
-        SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
+        SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, false);
         Preconditions.checkArgument(nodePair.coordinator.equals(request.src));
         String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
index ab294b9..8b123a7 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
@@ -18,12 +18,15 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -34,19 +37,22 @@ import org.apache.cassandra.locator.InetAddressAndPort;
  */
 public class HostDifferences
 {
-    private final Map<InetAddressAndPort, List<Range<Token>>> perHostDifferences = new HashMap<>();
+    private final Map<InetAddressAndPort, NavigableSet<Range<Token>>> perHostDifferences = new HashMap<>();
+    private static final Comparator<Range<Token>> comparator = Comparator.comparing((Range<Token> o) -> o.left);
 
     /**
      * Adds a set of differences between the node this instance is tracking and endpoint
      */
-    public void add(InetAddressAndPort endpoint, List<Range<Token>> difference)
+    public void add(InetAddressAndPort endpoint, Collection<Range<Token>> difference)
     {
-        perHostDifferences.put(endpoint, difference);
+        TreeSet<Range<Token>> sortedDiffs = new TreeSet<>(comparator);
+        sortedDiffs.addAll(difference);
+        perHostDifferences.put(endpoint, sortedDiffs);
     }
 
     public void addSingleRange(InetAddressAndPort remoteNode, Range<Token> rangeToFetch)
     {
-        perHostDifferences.computeIfAbsent(remoteNode, (x) -> new ArrayList<>()).add(rangeToFetch);
+        perHostDifferences.computeIfAbsent(remoteNode, (x) -> new TreeSet<>(comparator)).add(rangeToFetch);
     }
 
     /**
@@ -54,12 +60,25 @@ public class HostDifferences
      */
     public boolean hasDifferencesFor(InetAddressAndPort node2, Range<Token> range)
     {
-        List<Range<Token>> differences = get(node2);
-        for (Range<Token> diff : differences)
+        NavigableSet<Range<Token>> differences = get(node2);
+
+        if (differences.size() > 0 && differences.last().isWrapAround() && differences.last().intersects(range))
+            return true;
+
+        for (Range<Token> unwrappedRange : range.unwrap())
         {
-            // if the other node has a diff for this range, we know they are not equal.
-            if (range.equals(diff) || range.intersects(diff))
-                return true;
+            Range<Token> startKey = differences.floor(unwrappedRange);
+            Iterator<Range<Token>> iter = startKey == null ? differences.iterator() : differences.tailSet(startKey, true).iterator();
+
+            while (iter.hasNext())
+            {
+                Range<Token> diff = iter.next();
+                // if the other node has a diff for this range, we know they are not equal.
+                if (unwrappedRange.equals(diff) || unwrappedRange.intersects(diff))
+                    return true;
+                if (unwrappedRange.right.compareTo(diff.left) < 0 && !unwrappedRange.isWrapAround())
+                    break;
+            }
         }
         return false;
     }
@@ -69,9 +88,9 @@ public class HostDifferences
         return perHostDifferences.keySet();
     }
 
-    public List<Range<Token>> get(InetAddressAndPort differingHost)
+    public NavigableSet<Range<Token>> get(InetAddressAndPort differingHost)
     {
-        return perHostDifferences.getOrDefault(differingHost, Collections.emptyList());
+        return perHostDifferences.getOrDefault(differingHost, Collections.emptyNavigableSet());
     }
 
     public String toString()
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
index 450336f..5b4de8c 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableMap;
@@ -37,7 +35,7 @@ public class IncomingRepairStreamTracker
 {
     private static final Logger logger = LoggerFactory.getLogger(IncomingRepairStreamTracker.class);
     private final DifferenceHolder differences;
-    private final Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+    private final RangeMap<StreamFromOptions> incoming = new RangeMap<>();
 
     public IncomingRepairStreamTracker(DifferenceHolder differences)
     {
@@ -65,9 +63,7 @@ public class IncomingRepairStreamTracker
         logger.trace("adding incoming range {} from {}", range, streamFromNode);
         Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range, incoming);
         for (Range<Token> input : newInput)
-        {
             incoming.computeIfAbsent(input, (newRange) -> new StreamFromOptions(differences, newRange)).add(streamFromNode);
-        }
     }
 
     public ImmutableMap<Range<Token>, StreamFromOptions> getIncoming()
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
index f692dd6..2a29871 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
@@ -18,11 +18,9 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -33,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
 
 public class RangeDenormalizer
@@ -45,26 +44,17 @@ public class RangeDenormalizer
      * It makes sure that if there is an intersection between {{range}} and some of the ranges in {{incoming.keySet()}}
      * we know that all intersections are keys in the updated {{incoming}}
      */
-    public static Set<Range<Token>> denormalize(Range<Token> range, Map<Range<Token>, StreamFromOptions> incoming)
+    public static Set<Range<Token>> denormalize(Range<Token> range, RangeMap<StreamFromOptions> incoming)
     {
         logger.trace("Denormalizing range={} incoming={}", range, incoming);
-        Set<Range<Token>> existingRanges = new HashSet<>(incoming.keySet());
-        Map<Range<Token>, StreamFromOptions> existingOverlappingRanges = new HashMap<>();
-        // remove all overlapping ranges from the incoming map
-        for (Range<Token> existingRange : existingRanges)
-        {
-            if (range.intersects(existingRange))
-                existingOverlappingRanges.put(existingRange, incoming.remove(existingRange));
-        }
+        Set<Map.Entry<Range<Token>, StreamFromOptions>> existingOverlappingRanges = incoming.removeIntersecting(range);
 
-        Set<Range<Token>> intersections = intersection(existingRanges, range);
-        Set<Range<Token>> newExisting = Sets.union(subtractFromAllRanges(existingOverlappingRanges.keySet(), range), intersections);
-        Set<Range<Token>> newInput = Sets.union(range.subtractAll(existingOverlappingRanges.keySet()), intersections);
-        assertNonOverLapping(newExisting);
-        assertNonOverLapping(newInput);
+        Set<Range<Token>> intersections = intersection(existingOverlappingRanges, range);
+        Set<Range<Token>> newExisting = Sets.union(subtractFromAllRanges(existingOverlappingRanges, range), intersections);
+        Set<Range<Token>> newInput = Sets.union(subtractAll(existingOverlappingRanges, range), intersections);
         for (Range<Token> r : newExisting)
         {
-            for (Map.Entry<Range<Token>, StreamFromOptions> entry : existingOverlappingRanges.entrySet())
+            for (Map.Entry<Range<Token>, StreamFromOptions> entry : existingOverlappingRanges)
             {
                 if (r.intersects(entry.getKey()))
                     incoming.put(r, entry.getValue().copy(r));
@@ -72,7 +62,6 @@ public class RangeDenormalizer
         }
         logger.trace("denormalized {} to {}", range, newInput);
         logger.trace("denormalized incoming to {}", incoming);
-        assertNonOverLapping(incoming.keySet());
         return newInput;
     }
 
@@ -87,39 +76,48 @@ public class RangeDenormalizer
      *
      */
     @VisibleForTesting
-    static Set<Range<Token>> subtractFromAllRanges(Collection<Range<Token>> ranges, Range<Token> range)
+    static Set<Range<Token>> subtractFromAllRanges(Collection<Map.Entry<Range<Token>, StreamFromOptions>> ranges, Range<Token> range)
     {
         Set<Range<Token>> result = new HashSet<>();
-        for (Range<Token> r : ranges)
-            result.addAll(r.subtract(range)); // subtract can return two ranges if we remove the middle part
+        for (Map.Entry<Range<Token>, ?> r : ranges)
+            result.addAll(r.getKey().subtract(range)); // subtract can return two ranges if we remove the middle part
         return result;
     }
 
     /**
-     * Makes sure non of the input ranges are overlapping
+     * Returns all intersections between the ranges in ranges and the given range
      */
-    private static void assertNonOverLapping(Set<Range<Token>> ranges)
+    private static Set<Range<Token>> intersection(Set<Map.Entry<Range<Token>, StreamFromOptions>> ranges, Range<Token> range)
     {
-        List<Range<Token>> sortedRanges = Range.sort(ranges);
-        Token lastToken = null;
-        for (Range<Token> range : sortedRanges)
-        {
-            if (lastToken != null && lastToken.compareTo(range.left) > 0)
-            {
-                throw new AssertionError("Ranges are overlapping: "+ranges);
-            }
-            lastToken = range.right;
-        }
+        Set<Range<Token>> result = new HashSet<>();
+        for (Map.Entry<Range<Token>, StreamFromOptions> r : ranges)
+            result.addAll(range.intersectionWith(r.getKey()));
+        return result;
     }
 
     /**
-     * Returns all intersections between the ranges in ranges and the given range
+     * copied from Range - need to iterate over the map entries
      */
-    private static Set<Range<Token>> intersection(Collection<Range<Token>> ranges, Range<Token> range)
+    public static Set<Range<Token>> subtractAll(Collection<Map.Entry<Range<Token>, StreamFromOptions>> ranges, Range<Token> toSubtract)
     {
         Set<Range<Token>> result = new HashSet<>();
-        for (Range<Token> r : ranges)
-            result.addAll(range.intersectionWith(r));
+        result.add(toSubtract);
+        for(Map.Entry<Range<Token>, StreamFromOptions> range : ranges)
+        {
+            result = substractAllFromToken(result, range.getKey());
+        }
+
+        return result;
+    }
+
+    private static <T extends RingPosition<T>> Set<Range<T>> substractAllFromToken(Set<Range<T>> ranges, Range<T> subtract)
+    {
+        Set<Range<T>> result = new HashSet<>();
+        for(Range<T> range : ranges)
+        {
+            result.addAll(range.subtract(subtract));
+        }
+
         return result;
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/RangeMap.java b/src/java/org/apache/cassandra/repair/asymmetric/RangeMap.java
new file mode 100644
index 0000000..f957b1b
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/RangeMap.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public class RangeMap<T> implements Map<Range<Token>, T>
+{
+    private static final Comparator<Range<Token>> comparator = Comparator.comparing((Range<Token> o) -> o.left);
+
+    private final NavigableMap<Range<Token>, T> byStart;
+
+    public RangeMap()
+    {
+        byStart = new TreeMap<>(comparator);
+    }
+
+    public int size()
+    {
+        return byStart.size();
+    }
+
+    public boolean isEmpty()
+    {
+        return byStart.isEmpty();
+    }
+
+    public boolean containsKey(Object key)
+    {
+        return byStart.containsKey(key);
+    }
+
+    public boolean containsValue(Object value)
+    {
+        return byStart.containsValue(value);
+    }
+
+    public T get(Object key)
+    {
+        return byStart.get(key);
+    }
+
+    public T put(Range<Token> key, T value)
+    {
+        assertNonIntersecting(key);
+        return byStart.put(key, value);
+    }
+
+    private void assertNonIntersecting(Range<Token> range)
+    {
+        // todo: wraparound
+        Range<Token> before = byStart.floorKey(range);
+        Range<Token> after = byStart.ceilingKey(range);
+        assert before == null || !before.intersects(range);
+        assert after == null || !after.intersects(range);
+    }
+
+    public T remove(Object key)
+    {
+        return byStart.remove(key);
+    }
+
+    public void putAll(Map<? extends Range<Token>, ? extends T> m)
+    {
+        byStart.putAll(m);
+    }
+
+    public void clear()
+    {
+        byStart.clear();
+    }
+
+    public Set<Range<Token>> keySet()
+    {
+        return byStart.keySet();
+    }
+
+    public Collection<T> values()
+    {
+        return byStart.values();
+    }
+
+    public Set<Map.Entry<Range<Token>, T>> entrySet()
+    {
+        return byStart.entrySet();
+    }
+
+    /**
+     * might return duplicate entries if range.isWrapAround()
+     *
+     * don't depend on the order of the entries returned
+     */
+    @VisibleForTesting
+    Iterator<Map.Entry<Range<Token>, T>> intersectingEntryIterator(Range<Token> range)
+    {
+        return range.isWrapAround() ? new WrappingIntersectingIterator(range) : new IntersectingIterator(range);
+    }
+
+    public Set<Map.Entry<Range<Token>, T>> removeIntersecting(Range<Token> range)
+    {
+        Iterator<Map.Entry<Range<Token>, T>> iter = intersectingEntryIterator(range);
+        Set<Map.Entry<Range<Token>, T>> intersecting = new HashSet<>();
+        while (iter.hasNext())
+        {
+            Map.Entry<Range<Token>, T> entry = iter.next();
+            intersecting.add(entry);
+        }
+        for (Map.Entry<Range<Token>, T> entry : intersecting)
+            byStart.remove(entry.getKey());
+        return intersecting;
+    }
+
+    private class WrappingIntersectingIterator extends AbstractIterator<Map.Entry<Range<Token>, T>>
+    {
+        private final Iterator<Iterator<Map.Entry<Range<Token>, T>>> iterators;
+        private Iterator<Map.Entry<Range<Token>, T>> currentIter;
+
+        public WrappingIntersectingIterator(Range<Token> range)
+        {
+            List<Iterator<Map.Entry<Range<Token>, T>>> iters = new ArrayList<>(2);
+            for (Range<Token> unwrapped : range.unwrap())
+                iters.add((new IntersectingIterator(unwrapped)));
+            iterators = iters.iterator();
+        }
+        protected Map.Entry<Range<Token>, T> computeNext()
+        {
+            if ((currentIter == null || !currentIter.hasNext()) && iterators.hasNext())
+                currentIter = iterators.next();
+            if (currentIter != null && currentIter.hasNext())
+                return currentIter.next();
+            return endOfData();
+        }
+    }
+
+    private class IntersectingIterator extends AbstractIterator<Map.Entry<Range<Token>, T>>
+    {
+        private final Iterator<Map.Entry<Range<Token>, T>> tailIterator;
+        private final Range<Token> range;
+        // since we guarantee no ranges overlap in byStart, we know the last entry is possibly the wrap around range
+        private boolean shouldReturnLast = false;
+
+        public IntersectingIterator(Range<Token> range)
+        {
+            Range<Token> startKey = byStart.floorKey(range);
+            tailIterator = startKey == null ? byStart.entrySet().iterator() :
+                                              byStart.tailMap(startKey, true).entrySet().iterator();
+            Range<Token> last = byStart.isEmpty() ? null : byStart.lastKey();
+            if (last != null && last.isWrapAround() && last.intersects(range))
+                shouldReturnLast = true;
+            this.range = range;
+        }
+
+        protected Map.Entry<Range<Token>, T> computeNext()
+        {
+            if (shouldReturnLast)
+            {
+                shouldReturnLast = false;
+                return new Entry<>(byStart.lastEntry());
+            }
+            while (tailIterator.hasNext())
+            {
+                Entry<Range<Token>, T> candidateNext = new Entry<>(tailIterator.next());
+                Range<Token> candidateRange = candidateNext.getKey();
+
+                if (candidateRange.isWrapAround()) // we know we already returned any wrapping range
+                    continue;
+
+                if (candidateRange.left.compareTo(range.right) >= 0 && (!range.isWrapAround())) // range is unwrapped, but that means one range has right == min token and is still wrapping
+                    return endOfData();
+
+                if (range.left.compareTo(candidateRange.right) >= 0)
+                    continue;
+
+                return candidateNext;
+            }
+            return endOfData();
+        }
+    }
+
+    public String toString()
+    {
+        return byStart.toString();
+    }
+
+    static class Entry<K, V> implements Map.Entry<K, V>
+    {
+        private final V v;
+        private final K k;
+
+        Entry(K key, V val)
+        {
+            this.k = key;
+            this.v = val;
+        }
+
+        Entry(Map.Entry<K, V> toClone)
+        {
+            this(toClone.getKey(), toClone.getValue());
+        }
+        public K getKey()
+        {
+            return k;
+        }
+
+        public V getValue()
+        {
+            return v;
+        }
+
+        public V setValue(V value)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof Map.Entry)) return false;
+            Map.Entry<?, ?> entry = (Map.Entry<?, ?>) o;
+            return v.equals(entry.getValue()) &&
+                   k.equals(entry.getKey());
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(v, k);
+        }
+
+        public String toString()
+        {
+            return "Entry{" +
+                   "v=" + v +
+                   ", k=" + k +
+                   '}';
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
index c7d45bf..1c4bca7 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
@@ -18,7 +18,10 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -50,7 +53,7 @@ public class ReduceHelper
     public static ImmutableMap<InetAddressAndPort, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
     {
         Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
-        Map<InetAddressAndPort, Integer> outgoingStreamCounts = new HashMap<>();
+
         ImmutableMap.Builder<InetAddressAndPort, HostDifferences> mapBuilder = ImmutableMap.builder();
         for (Map.Entry<InetAddressAndPort, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
         {
@@ -59,7 +62,10 @@ public class ReduceHelper
             for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet())
             {
                 Range<Token> rangeToFetch = entry.getKey();
-                for (InetAddressAndPort remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter))
+                // StreamFromOptions contains a Set<Set<InetAddress>> with endpoints we need to stream
+                // rangeToFetch from - if the inner set size > 1 means those endpoints are identical
+                // for the range. pickConsistent picks a single endpoint from each of these sets.
+                for (InetAddressAndPort remoteNode : pickConsistent(trackerEntry.getKey(), entry.getValue(), filter))
                     rangesToFetch.addSingleRange(remoteNode, rangeToFetch);
             }
             mapBuilder.put(trackerEntry.getKey(), rangesToFetch);
@@ -78,7 +84,7 @@ public class ReduceHelper
             HostDifferences hostDifferences = differences.get(hostWithDifference);
             for (InetAddressAndPort differingHost : hostDifferences.hosts())
             {
-                List<Range<Token>> differingRanges = hostDifferences.get(differingHost);
+                Iterable<Range<Token>> differingRanges = hostDifferences.get(differingHost);
                 // hostWithDifference has mismatching ranges differingRanges with differingHost:
                 for (Range<Token> range : differingRanges)
                 {
@@ -99,38 +105,37 @@ public class ReduceHelper
         return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences));
     }
 
-    // greedily pick the nodes doing the least amount of streaming
-    private static Collection<InetAddressAndPort> pickLeastStreaming(InetAddressAndPort streamingNode,
-                                                              StreamFromOptions toStreamFrom,
-                                                              Map<InetAddressAndPort, Integer> outgoingStreamCounts,
-                                                              PreferedNodeFilter filter)
+    private static final Comparator<InetAddressAndPort> comparator = Comparator.comparing(InetAddressAndPort::getHostAddressAndPort);
+    /**
+     * Consistently picks the node after the streaming node to stream from
+     *
+     * this is done to reduce the amount of sstables created on the receiving node
+     *
+     * todo: note that this can be improved - if we have a case like:
+     *         addr3 will stream range1 from addr1 or addr2
+     *                           range2 from addr1
+     *       in a perfect world we would stream both range1 and range2 from addr1 - but in this case we might stream
+     *       range1 from addr2 depending on how the addresses sort
+     */
+    private static Collection<InetAddressAndPort> pickConsistent(InetAddressAndPort streamingNode,
+                                                                 StreamFromOptions toStreamFrom,
+                                                                 PreferedNodeFilter filter)
     {
         Set<InetAddressAndPort> retSet = new HashSet<>();
         for (Set<InetAddressAndPort> toStream : toStreamFrom.allStreams())
         {
-            InetAddressAndPort candidate = null;
-            Set<InetAddressAndPort> prefered = filter.apply(streamingNode, toStream);
-            for (InetAddressAndPort node : prefered)
-            {
-                if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
-                {
-                    candidate = node;
-                }
-            }
-            // ok, found no prefered hosts, try all of them
-            if (candidate == null)
-            {
-                for (InetAddressAndPort node : toStream)
-                {
-                    if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
-                    {
-                        candidate = node;
-                    }
-                }
-            }
-            assert candidate != null;
-            outgoingStreamCounts.put(candidate, outgoingStreamCounts.getOrDefault(candidate, 0) + 1);
-            retSet.add(candidate);
+            List<InetAddressAndPort> toSearch = new ArrayList<>(filter.apply(streamingNode, toStream));
+            if (toSearch.isEmpty())
+                toSearch = new ArrayList<>(toStream);
+
+            toSearch.sort(comparator);
+            int pos = Collections.binarySearch(toSearch, streamingNode, comparator);
+            assert pos < 0;
+            pos = -pos - 1;
+            if (pos == toSearch.size())
+                retSet.add(toSearch.get(0));
+            else
+                retSet.add(toSearch.get(pos));
         }
         return retSet;
     }
diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
deleted file mode 100644
index eacc285..0000000
--- a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair.messages;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.repair.RepairJobDesc;
-import org.apache.cassandra.streaming.PreviewKind;
-
-import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
-
-public class AsymmetricSyncRequest extends RepairMessage
-{
-    public final InetAddressAndPort initiator;
-    public final InetAddressAndPort fetchingNode;
-    public final InetAddressAndPort fetchFrom;
-    public final Collection<Range<Token>> ranges;
-    public final PreviewKind previewKind;
-
-    public AsymmetricSyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
-    {
-        super(desc);
-        this.initiator = initiator;
-        this.fetchingNode = fetchingNode;
-        this.fetchFrom = fetchFrom;
-        this.ranges = ranges;
-        this.previewKind = previewKind;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (!(o instanceof AsymmetricSyncRequest))
-            return false;
-        AsymmetricSyncRequest req = (AsymmetricSyncRequest)o;
-        return desc.equals(req.desc) &&
-               initiator.equals(req.initiator) &&
-               fetchingNode.equals(req.fetchingNode) &&
-               fetchFrom.equals(req.fetchFrom) &&
-               ranges.equals(req.ranges);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(desc, initiator, fetchingNode, fetchFrom, ranges);
-    }
-
-    public static final IVersionedSerializer<AsymmetricSyncRequest> serializer = new IVersionedSerializer<AsymmetricSyncRequest>()
-    {
-        public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException
-        {
-            RepairJobDesc.serializer.serialize(message.desc, out, version);
-            inetAddressAndPortSerializer.serialize(message.initiator, out, version);
-            inetAddressAndPortSerializer.serialize(message.fetchingNode, out, version);
-            inetAddressAndPortSerializer.serialize(message.fetchFrom, out, version);
-            out.writeInt(message.ranges.size());
-            for (Range<Token> range : message.ranges)
-            {
-                IPartitioner.validate(range);
-                AbstractBounds.tokenSerializer.serialize(range, out, version);
-            }
-            out.writeInt(message.previewKind.getSerializationVal());
-        }
-
-        public AsymmetricSyncRequest deserialize(DataInputPlus in, int version) throws IOException
-        {
-            RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
-            InetAddressAndPort owner = inetAddressAndPortSerializer.deserialize(in, version);
-            InetAddressAndPort src = inetAddressAndPortSerializer.deserialize(in, version);
-            InetAddressAndPort dst = inetAddressAndPortSerializer.deserialize(in, version);
-            int rangesCount = in.readInt();
-            List<Range<Token>> ranges = new ArrayList<>(rangesCount);
-            for (int i = 0; i < rangesCount; ++i)
-                ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, IPartitioner.global(), version));
-            PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
-            return new AsymmetricSyncRequest(desc, owner, src, dst, ranges, previewKind);
-        }
-
-        public long serializedSize(AsymmetricSyncRequest message, int version)
-        {
-            long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
-            size += inetAddressAndPortSerializer.serializedSize(message.initiator, version);
-            size += inetAddressAndPortSerializer.serializedSize(message.fetchingNode, version);
-            size += inetAddressAndPortSerializer.serializedSize(message.fetchFrom, version);
-            size += TypeSizes.sizeof(message.ranges.size());
-            for (Range<Token> range : message.ranges)
-                size += AbstractBounds.tokenSerializer.serializedSize(range, version);
-            size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
-            return size;
-        }
-    };
-
-    public String toString()
-    {
-        return "AsymmetricSyncRequest{" +
-               "initiator=" + initiator +
-               ", fetchingNode=" + fetchingNode +
-               ", fetchFrom=" + fetchFrom +
-               ", ranges=" + ranges +
-               ", previewKind=" + previewKind +
-               ", desc="+desc+
-               '}';
-    }
-}
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index a76e6c0..c2e2c18 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -390,7 +390,22 @@ public class RepairOption
 
     public boolean optimiseStreams()
     {
-        return optimiseStreams;
+        if(optimiseStreams)
+            return true;
+
+        if (isPullRepair() || isForcedRepair())
+            return false;
+
+        if (isIncremental() && DatabaseDescriptor.autoOptimiseIncRepairStreams())
+            return true;
+
+        if (isPreview() && DatabaseDescriptor.autoOptimisePreviewRepairStreams())
+            return true;
+
+        if (!isIncremental() && DatabaseDescriptor.autoOptimiseFullRepairStreams())
+            return true;
+
+        return false;
     }
 
     public boolean ignoreUnreplicatedKeyspaces()
@@ -413,7 +428,7 @@ public class RepairOption
                ", # of ranges: " + ranges.size() +
                ", pull repair: " + pullRepair +
                ", force repair: " + forceRepair +
-               ", optimise streams: "+ optimiseStreams +
+               ", optimise streams: "+ optimiseStreams() +
                ", ignore unreplicated keyspaces: "+ ignoreUnreplicatedKeyspaces +
                ')';
     }
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 341455f..9886abc 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -50,8 +50,15 @@ public class SyncRequest extends RepairMessage
     public final InetAddressAndPort dst;
     public final Collection<Range<Token>> ranges;
     public final PreviewKind previewKind;
+    public final boolean asymmetric;
 
-   public SyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
+   public SyncRequest(RepairJobDesc desc,
+                      InetAddressAndPort initiator,
+                      InetAddressAndPort src,
+                      InetAddressAndPort dst,
+                      Collection<Range<Token>> ranges,
+                      PreviewKind previewKind,
+                      boolean asymmetric)
    {
         super(desc);
         this.initiator = initiator;
@@ -59,6 +66,7 @@ public class SyncRequest extends RepairMessage
         this.dst = dst;
         this.ranges = ranges;
         this.previewKind = previewKind;
+        this.asymmetric = asymmetric;
     }
 
     @Override
@@ -72,7 +80,8 @@ public class SyncRequest extends RepairMessage
                src.equals(req.src) &&
                dst.equals(req.dst) &&
                ranges.equals(req.ranges) &&
-               previewKind == req.previewKind;
+               previewKind == req.previewKind &&
+               asymmetric == req.asymmetric;
     }
 
     @Override
@@ -96,6 +105,7 @@ public class SyncRequest extends RepairMessage
                 AbstractBounds.tokenSerializer.serialize(range, out, version);
             }
             out.writeInt(message.previewKind.getSerializationVal());
+            out.writeBoolean(message.asymmetric);
         }
 
         public SyncRequest deserialize(DataInputPlus in, int version) throws IOException
@@ -109,7 +119,8 @@ public class SyncRequest extends RepairMessage
             for (int i = 0; i < rangesCount; ++i)
                 ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, IPartitioner.global(), version));
             PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
-            return new SyncRequest(desc, owner, src, dst, ranges, previewKind);
+            boolean asymmetric = in.readBoolean();
+            return new SyncRequest(desc, owner, src, dst, ranges, previewKind, asymmetric);
         }
 
         public long serializedSize(SyncRequest message, int version)
@@ -120,6 +131,7 @@ public class SyncRequest extends RepairMessage
             for (Range<Token> range : message.ranges)
                 size += AbstractBounds.tokenSerializer.serializedSize(range, version);
             size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
+            size += TypeSizes.sizeof(message.asymmetric);
             return size;
         }
     };
@@ -133,6 +145,7 @@ public class SyncRequest extends RepairMessage
                 ", dst=" + dst +
                 ", ranges=" + ranges +
                 ", previewKind=" + previewKind +
+                ", asymmetric=" + asymmetric +
                 "} " + super.toString();
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 9c6499a..8baf7fe 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5767,4 +5767,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return outstanding.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(),
                                                                         e -> e.getValue().stream().map(InetAddressAndPort::toString).collect(Collectors.toSet())));
     }
+
+    public boolean autoOptimiseIncRepairStreams()
+    {
+        return DatabaseDescriptor.autoOptimiseIncRepairStreams();
+    }
+
+    public void setAutoOptimiseIncRepairStreams(boolean enabled)
+    {
+        DatabaseDescriptor.setAutoOptimiseIncRepairStreams(enabled);
+    }
+
+    public boolean autoOptimiseFullRepairStreams()
+    {
+        return DatabaseDescriptor.autoOptimiseFullRepairStreams();
+    }
+
+    public void setAutoOptimiseFullRepairStreams(boolean enabled)
+    {
+        DatabaseDescriptor.setAutoOptimiseFullRepairStreams(enabled);
+    }
+
+    public boolean autoOptimisePreviewRepairStreams()
+    {
+        return DatabaseDescriptor.autoOptimisePreviewRepairStreams();
+    }
+
+    public void setAutoOptimisePreviewRepairStreams(boolean enabled)
+    {
+        DatabaseDescriptor.setAutoOptimisePreviewRepairStreams(enabled);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0c4b5b0..1df3146 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -826,4 +826,12 @@ public interface StorageServiceMBean extends NotificationEmitter
     @Deprecated
     public Map<String, Set<InetAddress>> getOutstandingSchemaVersions();
     public Map<String, Set<String>> getOutstandingSchemaVersionsWithPort();
+
+    // see CASSANDRA-3200
+    public boolean autoOptimiseIncRepairStreams();
+    public void setAutoOptimiseIncRepairStreams(boolean enabled);
+    public boolean autoOptimiseFullRepairStreams();
+    public void setAutoOptimiseFullRepairStreams(boolean enabled);
+    public boolean autoOptimisePreviewRepairStreams();
+    public void setAutoOptimisePreviewRepairStreams(boolean enabled);
 }
diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin
index b0cc44e..17bb014 100644
Binary files a/test/data/serialization/4.0/service.SyncRequest.bin and b/test/data/serialization/4.0/service.SyncRequest.bin differ
diff --git a/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java
new file mode 100644
index 0000000..8a1c1d7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.AsymmetricRemoteSyncTask;
+import org.apache.cassandra.repair.LocalSyncTask;
+import org.apache.cassandra.repair.RepairJob;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.SyncTask;
+import org.apache.cassandra.repair.TreeResponse;
+import org.apache.cassandra.streaming.PreviewKind;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OptimiseStreamsRepairTest extends TestBaseImpl
+{
+    @Test
+    public void testBasic() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(3)
+                                          .withInstanceInitializer(BBHelper::install)
+                                          .withConfig(config -> config.set("hinted_handoff_enabled", false)
+                                                                      .with(GOSSIP)
+                                                                      .with(NETWORK))
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+            for (int i = 0; i < 100000; i++)
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", ConsistencyLevel.ALL, i, i);
+            cluster.forEach((i) -> i.flush(KEYSPACE));
+
+            cluster.get(2).shutdown().get();
+
+            for (int i = 0; i < 2000; i++)
+                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, t) values (?,?)", ConsistencyLevel.QUORUM, i, i * 2 + 2);
+
+            cluster.get(2).startup();
+            Thread.sleep(10000);
+            cluster.forEach(c -> c.flush(KEYSPACE));
+            cluster.forEach(c -> c.forceCompact(KEYSPACE, "tbl"));
+
+            NodeToolResult res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-os");
+            res.asserts().success();
+
+            res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-vd");
+            res.asserts().success();
+            res.asserts().notificationContains("Repaired data is in sync");
+
+            res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "--preview", "--full");
+            res.asserts().success();
+            res.asserts().notificationContains("Previewed data was in sync");
+        }
+    }
+
+    public static class BBHelper
+    {
+        public static void install(ClassLoader cl, int id)
+        {
+            new ByteBuddy().rebase(RepairJob.class)
+                           .method(named("createOptimisedSyncingSyncTasks"))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc,
+                                                                     List<TreeResponse> trees,
+                                                                     InetAddressAndPort local,
+                                                                     Predicate<InetAddressAndPort> isTransient,
+                                                                     Function<InetAddressAndPort, String> getDC,
+                                                                     boolean isIncremental,
+                                                                     PreviewKind previewKind,
+                                                                     @SuperCall Callable<List<SyncTask>> zuperCall)
+        {
+            List<SyncTask> tasks = null;
+            try
+            {
+                tasks = zuperCall.call();
+                verifySyncTasks(tasks);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            return tasks;
+        }
+
+        private static void verifySyncTasks(List<SyncTask> tasks) throws UnknownHostException
+        {
+            Map<InetAddressAndPort, Map<InetAddressAndPort, List<Range<Token>>>> fetching = new HashMap<>();
+            for (SyncTask task : tasks)
+            {
+                if (task instanceof LocalSyncTask)
+                {
+                    assertFalse(((LocalSyncTask)task).transferRanges);
+                    assertTrue(((LocalSyncTask)task).requestRanges);
+                }
+                else
+                    assertTrue(task instanceof AsymmetricRemoteSyncTask);
+
+                Map<InetAddressAndPort, List<Range<Token>>> fetch = fetching.computeIfAbsent(task.nodePair().coordinator, k -> new HashMap<>());
+                fetch.computeIfAbsent(task.nodePair().peer, k -> new ArrayList<>()).addAll(task.rangesToSync);
+            }
+            // 127.0.0.2 is the node out of sync - make sure it does not receive multiple copies of the same range from the other nodes;
+            Map<InetAddressAndPort, List<Range<Token>>> node2 = fetching.get(InetAddressAndPort.getByName("127.0.0.2"));
+            Set<Range<Token>> allRanges = new HashSet<>();
+            node2.values().forEach(ranges -> ranges.forEach(r -> assertTrue(allRanges.add(r))));
+
+            // 127.0.0.2 should stream the same ranges to .1 and .3
+            Set<Range<Token>> node2ToNode1 = new HashSet<>(fetching.get(InetAddressAndPort.getByName("127.0.0.1")).get(InetAddressAndPort.getByName("127.0.0.2")));
+            Set<Range<Token>> node2ToNode3 = new HashSet<>(fetching.get(InetAddressAndPort.getByName("127.0.0.3")).get(InetAddressAndPort.getByName("127.0.0.2")));
+            assertEquals(node2ToNode1, allRanges);
+            assertEquals(node2ToNode3, allRanges);
+        }
+    }
+
+    @Test
+    public void randomTest() throws IOException
+    {
+        try(Cluster cluster = init(Cluster.build(3)
+                                          .withConfig(config -> config.set("hinted_handoff_enabled", false)
+                                                                      .with(GOSSIP)
+                                                                      .with(NETWORK))
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+            for (int i = 0; i < 10_000; i++)
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", ConsistencyLevel.ALL, i, i);
+            cluster.forEach((i) -> i.flush(KEYSPACE));
+
+            Random r = new Random();
+            for (int i = 0; i < 20000; i++)
+                for (int j = 1; j <= 3; j++)
+                    cluster.get(j).executeInternal("INSERT INTO "+KEYSPACE+".tbl (id, t) values (?,?)", r.nextInt(), i * 2 + 2);
+
+            NodeToolResult res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-os");
+            res.asserts().success();
+
+            res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-vd");
+            res.asserts().success();
+            res.asserts().notificationContains("Repaired data is in sync");
+
+            res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "--preview", "--full");
+            res.asserts().success();
+            res.asserts().notificationContains("Previewed data was in sync");
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 9887d38..87863b5 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -73,6 +73,7 @@ import org.apache.cassandra.utils.asserts.SyncTaskListAssert;
 import static org.apache.cassandra.utils.asserts.SyncTaskAssert.assertThat;
 import static org.apache.cassandra.utils.asserts.SyncTaskListAssert.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public class RepairJobTest
 {
@@ -610,7 +611,7 @@ public class RepairJobTest
     }
 
     @Test
-    public void testOptimizedCreateStandardSyncTasksAllDifferent()
+    public void testOptimisedCreateStandardSyncTasksAllDifferent()
     {
         List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "one", RANGE_2, "one", RANGE_3, "one"),
                                                          treeResponse(addr2, RANGE_1, "two", RANGE_2, "two", RANGE_3, "two"),
@@ -636,8 +637,17 @@ public class RepairJobTest
     }
 
     @Test
-    public void testOptimizedCreateStandardSyncTasks()
+    public void testOptimisedCreateStandardSyncTasks()
     {
+        /*
+        addr1 will stream range1 from addr3
+                          range2 from addr2 or addr3
+        addr2 will stream range1 from addr3
+                          range2 from addr1
+        addr3 will stream range1 from addr1 or addr2
+                          range2 from addr1
+         */
+
         List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "one", RANGE_2, "one"),
                                                          treeResponse(addr2, RANGE_1, "one", RANGE_2, "two"),
                                                          treeResponse(addr3, RANGE_1, "three", RANGE_2, "two"));
@@ -652,21 +662,23 @@ public class RepairJobTest
 
         assertThat(tasks.values()).areAllInstanceOf(AsymmetricRemoteSyncTask.class);
 
-        assertThat(tasks.get(pair(addr1, addr3)).rangesToSync).containsExactly(RANGE_1);
+        // addr1 streams range1 from addr3:
+        assertThat(tasks.get(pair(addr1, addr3)).rangesToSync).contains(RANGE_1);
         // addr1 can get range2 from either addr2 or addr3 but not from both
         assertStreamRangeFromEither(tasks, RANGE_2, addr1, addr2, addr3);
 
-        assertThat(tasks.get(pair(addr2, addr3)).rangesToSync).containsExactly(RANGE_1);
-        assertThat(tasks.get(pair(addr2, addr1)).rangesToSync).containsExactly(RANGE_2);
-
+        // addr2 streams range1 from addr3
+        assertThat(tasks.get(pair(addr2, addr3)).rangesToSync).contains(RANGE_1);
+        // addr2 streams range2 from addr1
+        assertThat(tasks.get(pair(addr2, addr1)).rangesToSync).contains(RANGE_2);
         // addr3 can get range1 from either addr1 or addr2 but not from both
         assertStreamRangeFromEither(tasks, RANGE_1, addr3, addr2, addr1);
-
-        assertThat(tasks.get(pair(addr3, addr1)).rangesToSync).containsExactly(RANGE_2);
+        // addr3 streams range2 from addr1
+        assertThat(tasks.get(pair(addr3, addr1)).rangesToSync).contains(RANGE_2);
     }
 
     @Test
-    public void testOptimizedCreateStandardSyncTasksWithTransient()
+    public void testOptimisedCreateStandardSyncTasksWithTransient()
     {
         List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"),
                                                          treeResponse(addr2, RANGE_1, "different", RANGE_2, "same", RANGE_3, "different"),
@@ -681,7 +693,6 @@ public class RepairJobTest
                                                                                             false,
                                                                                             PreviewKind.ALL));
 
-        assertThat(tasks).hasSize(3);
         SyncTask task = tasks.get(pair(addr1, addr2));
 
         assertThat(task)
@@ -698,23 +709,21 @@ public class RepairJobTest
     public static void assertStreamRangeFromEither(Map<SyncNodePair, SyncTask> tasks, Range<Token> range,
                                                    InetAddressAndPort target, InetAddressAndPort either, InetAddressAndPort or)
     {
-        InetAddressAndPort streamsFrom;
-        InetAddressAndPort doesntStreamFrom;
-        if (tasks.containsKey(pair(target, either)) && tasks.get(pair(target, either)).rangesToSync.contains(range))
+        SyncTask task1 = tasks.get(pair(target, either));
+        SyncTask task2 = tasks.get(pair(target, or));
+
+        boolean foundRange = false;
+        if (task1 != null && task1.rangesToSync.contains(range))
         {
-            streamsFrom = either;
-            doesntStreamFrom = or;
+            foundRange = true;
+            assertDoesntStreamRangeFrom(range, task2);
         }
-        else
+        else if (task2 != null && task2.rangesToSync.contains(range))
         {
-            doesntStreamFrom = either;
-            streamsFrom = or;
+            foundRange = true;
+            assertDoesntStreamRangeFrom(range, task1);
         }
-
-        SyncTask task = tasks.get(pair(target, streamsFrom));
-        assertThat(task).isInstanceOf(AsymmetricRemoteSyncTask.class);
-        assertThat(task.rangesToSync).containsOnly(range);
-        assertDoesntStreamRangeFrom(range, tasks.get(pair(target, doesntStreamFrom)));
+        assertTrue(foundRange);
     }
 
     public static void assertDoesntStreamRangeFrom(Range<Token> range, SyncTask task)
diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
index ea5ebbf..1057a67 100644
--- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
@@ -66,7 +66,7 @@ public class StreamingRepairTaskTest extends AbstractRepairTest
         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
         RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
 
-        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
+        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE, false);
         StreamingRepairTask task = new StreamingRepairTask(desc, request.initiator, request.src, request.dst, request.ranges, desc.sessionId, PreviewKind.NONE, false);
 
         StreamPlan plan = task.createStreamPlan(request.dst);
@@ -79,7 +79,7 @@ public class StreamingRepairTaskTest extends AbstractRepairTest
         UUID sessionID = registerSession(cfs, false, true);
         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
         RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
-        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
+        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE, false);
         StreamingRepairTask task = new StreamingRepairTask(desc, request.initiator, request.src, request.dst, request.ranges, null, PreviewKind.NONE, false);
 
         StreamPlan plan = task.createStreamPlan(request.dst);
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/RangeDenormalizerTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/RangeDenormalizerTest.java
index a128f2b..3d66d83 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/RangeDenormalizerTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/RangeDenormalizerTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
@@ -39,7 +37,7 @@ public class RangeDenormalizerTest
     {
         // test when the new incoming range is fully contained within an existing incoming range
         StreamFromOptions dummy = new StreamFromOptions(null, range(0, 100));
-        Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+        RangeMap<StreamFromOptions> incoming = new RangeMap<>();
         incoming.put(range(0, 100), dummy);
         Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range(30, 40), incoming);
         assertEquals(3, incoming.size());
@@ -55,7 +53,7 @@ public class RangeDenormalizerTest
     {
         // test when the new incoming range fully contains an existing incoming range
         StreamFromOptions dummy = new StreamFromOptions(null, range(40, 50));
-        Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+        RangeMap<StreamFromOptions> incoming = new RangeMap<>();
         incoming.put(range(40, 50), dummy);
         Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range(0, 100), incoming);
         assertEquals(1, incoming.size());
@@ -73,7 +71,7 @@ public class RangeDenormalizerTest
         StreamFromOptions dummy = new StreamFromOptions(null, range(0, 100));
         StreamFromOptions dummy2 = new StreamFromOptions(null, range(200, 300));
         StreamFromOptions dummy3 = new StreamFromOptions(null, range(500, 600));
-        Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+        RangeMap<StreamFromOptions> incoming = new RangeMap<>();
         incoming.put(range(0, 100), dummy);
         incoming.put(range(200, 300), dummy2);
         incoming.put(range(500, 600), dummy3);
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/RangeMapTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/RangeMapTest.java
new file mode 100644
index 0000000..0805ea7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/RangeMapTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class RangeMapTest
+{
+    @Test
+    public void randomTest()
+    {
+        int iterCount = 0;
+        while (iterCount < 10000)
+        {
+            RangeMap<Integer> rangeMap = new RangeMap<>();
+            int cnt = 2000;
+            int i = 0;
+            long seed = System.currentTimeMillis();
+            Random r = new Random(seed);
+            Set<Range<Token>> randomRanges = random(cnt, r);
+            for (Range<Token> range : randomRanges)
+                rangeMap.put(range, i++);
+
+            long a = r.nextLong() % 100000;
+            long b = r.nextLong() % 100000;
+            if (a == b) b++;
+
+            Range<Token> intersectionRange = r(a, b);
+
+            Set<Map.Entry<Range<Token>, Integer>> expected = new HashSet<>();
+            for (Map.Entry<Range<Token>, Integer> entry : rangeMap.entrySet())
+                if (intersectionRange.intersects(entry.getKey()))
+                    expected.add(new RangeMap.Entry<>(entry));
+
+            Set<Map.Entry<Range<Token>, Integer>> intersection = new HashSet<>(rangeMap.removeIntersecting(intersectionRange));
+
+            // no intersecting ranges left in the range map:
+            for (Map.Entry<Range<Token>, Integer> entry : rangeMap.entrySet())
+                assertFalse("seed:"+seed, intersectionRange.intersects(entry.getKey()));
+
+            assertEquals("seed:"+seed, expected, intersection);
+            if (++iterCount % 1000 == 0)
+                 System.out.println(iterCount);
+        }
+    }
+
+    Set<Range<Token>> random(int cnt, Random r)
+    {
+        Set<Long> uniqueTokens = new HashSet<>(cnt * 2);
+        for (int i = 0; i < cnt * 2; i++)
+            while(!uniqueTokens.add(r.nextLong() % 1000000));
+        List<Long> randomTokens = new ArrayList<>(uniqueTokens);
+        randomTokens.sort(Long::compare);
+
+        Set<Range<Token>> ranges = new HashSet<>(cnt);
+        for (int i = 0; i < cnt - 1; i++)
+        {
+            ranges.add(r(randomTokens.get(i), randomTokens.get(i+1)));
+            i++;
+        }
+        ranges.add(r(randomTokens.get(randomTokens.size() - 1), randomTokens.get(0) - 1));
+        return ranges;
+    }
+
+    private Range<Token> r(long left, long right)
+    {
+        return new Range<>(new Murmur3Partitioner.LongToken(left), new Murmur3Partitioner.LongToken(right));
+    }
+
+    @Test
+    public void testEmpty()
+    {
+        RangeMap<Integer> rmap = new RangeMap<>();
+        assertFalse(rmap.intersectingEntryIterator(r(1, 10)).hasNext());
+    }
+}
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
index 6c64b1a..01b0cae 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
@@ -22,16 +22,21 @@ package org.apache.cassandra.repair.asymmetric;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
+import java.util.TreeSet;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.keyvalue.AbstractMapEntry;
 import org.junit.Test;
 
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -263,30 +268,30 @@ public class ReduceHelperTest
 
         HostDifferences n0 = reduced.get(A);
 
-        assertTrue(n0.get(B).equals(list(range(50, 100))));
-        assertTrue(n0.get(C).equals(list(range(0, 50))));
+        assertTrue(n0.get(B).equals(set(range(50, 100))));
+        assertTrue(n0.get(C).equals(set(range(0, 50))));
 
         HostDifferences n1 = reduced.get(B);
         assertEquals(0, n1.get(B).size());
         if (!n1.get(A).isEmpty())
         {
-            assertTrue(n1.get(C).equals(list(range(0, 50))));
-            assertTrue(n1.get(A).equals(list(range(50, 100))));
+            assertTrue(n1.get(C).equals(set(range(0, 50))));
+            assertTrue(n1.get(A).equals(set(range(50, 100))));
         }
         else
         {
-            assertTrue(n1.get(C).equals(list(range(0, 50), range(50, 100))));
+            assertTrue(n1.get(C).equals(set(range(0, 50), range(50, 100))));
         }
         HostDifferences n2 = reduced.get(C);
         assertEquals(0, n2.get(C).size());
         if (!n2.get(A).isEmpty())
         {
-            assertTrue(n2.get(A).equals(list(range(0,50))));
-            assertTrue(n2.get(B).equals(list(range(50, 100))));
+            assertTrue(n2.get(A).equals(set(range(0,50))));
+            assertTrue(n2.get(B).equals(set(range(50, 100))));
         }
         else
         {
-            assertTrue(n2.get(A).equals(list(range(0, 50), range(50, 100))));
+            assertTrue(n2.get(A).equals(set(range(0, 50), range(50, 100))));
         }
 
 
@@ -389,6 +394,14 @@ public class ReduceHelperTest
         return ret;
     }
 
+    @SafeVarargs
+    private static NavigableSet<Range<Token>> set(Range<Token> ... ranges)
+    {
+        NavigableSet<Range<Token>> res = new TreeSet<>(Comparator.comparing(o -> o.left));
+        res.addAll(Arrays.asList(ranges));
+        return res;
+    }
+
     static Murmur3Partitioner.LongToken longtok(long l)
     {
         return new Murmur3Partitioner.LongToken(l);
@@ -399,19 +412,24 @@ public class ReduceHelperTest
         return new Range<>(longtok(t), longtok(t2));
     }
 
+    static Map.Entry<Range<Token>, StreamFromOptions> rangeEntry(long t, long t2)
+    {
+        return new AbstractMapEntry(range(t, t2), new StreamFromOptions(null, null)) {};
+    }
+
     @Test
     public void testSubtractAllRanges()
     {
-        Set<Range<Token>> ranges = new HashSet<>();
-        ranges.add(range(10, 20)); ranges.add(range(40, 60));
+        Set<Map.Entry<Range<Token>, StreamFromOptions>> ranges = new HashSet<>();
+        ranges.add(rangeEntry(10, 20)); ranges.add(rangeEntry(40, 60));
         assertEquals(0, RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)).size());
-        ranges.add(range(90, 110));
+        ranges.add(rangeEntry(90, 110));
         assertEquals(Sets.newHashSet(range(100, 110)), RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)));
-        ranges.add(range(-10, 10));
+        ranges.add(rangeEntry(-10, 10));
         assertEquals(Sets.newHashSet(range(-10, 0), range(100, 110)), RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)));
     }
 
-    private void assertStreamFromEither(List<Range<Token>> r1, List<Range<Token>> r2)
+    private void assertStreamFromEither(Collection<Range<Token>> r1, Collection<Range<Token>> r2)
     {
         assertTrue(r1.size() > 0 ^ r2.size() > 0);
     }
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
index fa037a0..761a77c 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -150,7 +150,7 @@ public class RepairMessageSerializationsTest
         InetAddressAndPort src = InetAddressAndPort.getByName("127.0.0.2");
         InetAddressAndPort dst = InetAddressAndPort.getByName("127.0.0.3");
 
-        SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges(), PreviewKind.NONE);
+        SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges(), PreviewKind.NONE, false);
         serializeRoundTrip(msg, SyncRequest.serializer);
     }
 
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 0a5a023..f4b3b1c 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -173,7 +173,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         InetAddressAndPort src = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", PORT);
         InetAddressAndPort dest = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.3", PORT);
 
-        SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE), PreviewKind.NONE);
+        SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE), PreviewKind.NONE, false);
         testRepairMessageWrite("service.SyncRequest.bin", SyncRequest.serializer, message);
     }
 
@@ -195,6 +195,7 @@ public class SerializationsTest extends AbstractSerializationsTester
             assert src.equals(message.src);
             assert dest.equals(message.dst);
             assert message.ranges.size() == 1 && message.ranges.contains(FULL_RANGE);
+            assert !message.asymmetric;
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org