You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2019/02/14 21:27:47 UTC

[cassandra] branch trunk updated (cbf4da4 -> 70e6f29)

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from cbf4da4  CASSANDRA-14991 Follow-up: clean up SSL Hot Reloading code
     new b30c8c9  Improve merkle tree size and time on heap
     new 1268530  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 70e6f29  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   3 +
 conf/cassandra.yaml                                |  11 +
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../cassandra/config/DatabaseDescriptor.java       |  54 ++++
 .../org/apache/cassandra/repair/RepairJob.java     |   4 +-
 .../org/apache/cassandra/repair/RepairSession.java |  17 +-
 .../apache/cassandra/repair/ValidationManager.java |  17 +-
 .../cassandra/service/ActiveRepairService.java     |  12 +
 .../service/ActiveRepairServiceMBean.java          |   3 +
 .../apache/cassandra/service/StorageService.java   |  11 +-
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../org/apache/cassandra/utils/MerkleTree.java     |  46 +++
 .../cassandra/config/DatabaseDescriptorTest.java   |  69 +++++
 .../org/apache/cassandra/repair/RepairJobTest.java | 317 +++++++++++++++++----
 .../org/apache/cassandra/repair/ValidatorTest.java | 158 +++++++++-
 .../org/apache/cassandra/utils/MerkleTreeTest.java |  98 ++++++-
 17 files changed, 762 insertions(+), 68 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by bd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 70e6f2952fd648f45aa9075909c13e5783141318
Merge: cbf4da4 1268530
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Thu Feb 14 13:21:54 2019 -0800

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   3 +
 conf/cassandra.yaml                                |  11 +
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../cassandra/config/DatabaseDescriptor.java       |  54 ++++
 .../org/apache/cassandra/repair/RepairJob.java     |   4 +-
 .../org/apache/cassandra/repair/RepairSession.java |  17 +-
 .../apache/cassandra/repair/ValidationManager.java |  17 +-
 .../cassandra/service/ActiveRepairService.java     |  12 +
 .../service/ActiveRepairServiceMBean.java          |   3 +
 .../apache/cassandra/service/StorageService.java   |  11 +-
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../org/apache/cassandra/utils/MerkleTree.java     |  46 +++
 .../cassandra/config/DatabaseDescriptorTest.java   |  69 +++++
 .../org/apache/cassandra/repair/RepairJobTest.java | 317 +++++++++++++++++----
 .../org/apache/cassandra/repair/ValidatorTest.java | 158 +++++++++-
 .../org/apache/cassandra/utils/MerkleTreeTest.java |  98 ++++++-
 17 files changed, 762 insertions(+), 68 deletions(-)

diff --cc CHANGES.txt
index 87f3e2b,00ca115..1394ea8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -354,6 -11,6 +354,7 @@@
   * Make stop-server.bat wait for Cassandra to terminate (CASSANDRA-14829)
   * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870)
  Merged from 3.0:
++ * Improve merkle tree size and time on heap (CASSANDRA-14096)
   * Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
   * Add a script to make running the cqlsh tests in cassandra repo easier (CASSANDRA-14951)
   * If SizeEstimatesRecorder misses a 'onDropTable' notification, the size_estimates table will never be cleared for that table. (CASSANDRA-14905)
diff --cc NEWS.txt
index 00c3178,d5a9128..d9950e3
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -113,112 -47,8 +113,115 @@@ New feature
  
  Upgrading
  ---------
 -	- repair_session_max_tree_depth setting has been added to cassandra.yaml to allow operators to reduce
 -	  merkle tree size if repair is creating too much heap pressure. See CASSANDRA-14096 for details.
 +    - CASSANDRA-13241 lowered the default chunk_lengh_in_kb for compresesd tables from
 +      64kb to 16kb. For highly compressible data this can have a noticeable impact
 +      on space utilization. You may want to consider manually specifying this value.
 +    - Additional columns have been added to system_distributed.repair_history,
 +      system_traces.sessions and system_traces.events. As a result select queries
 +      againsts these tables will fail and generate an error in the log
 +      during upgrade when the cluster is mixed version. The tables can be made
 +      readable by following the instructions in CASSANDRA-14897 to add the
 +      new columns to the system tables before upgrading.
 +    - Timestamp ties between values resolve differently: if either value has a TTL,
 +      this value always wins. This is to provide consistent reconciliation before
 +      and after the value expires into a tombstone.
 +    - Cassandra 4.0 removed support for COMPACT STORAGE tables. All Compact Tables
 +      have to be migrated using `ALTER ... DROP COMPACT STORAGE` statement in 3.0/3.11.
 +      Cassandra starting 4.0 will not start if flags indicate that the table is non-CQL.
 +      Syntax for creating compact tables is also deprecated.
 +    - Support for legacy auth tables in the system_auth keyspace (users,
 +      permissions, credentials) and the migration code has been removed. Migration
 +      of these legacy auth tables must have been completed before the upgrade to
 +      4.0 and the legacy tables must have been removed. See the 'Upgrading' section
 +      for version 2.2 for migration instructions.
 +    - Cassandra 4.0 removed support for the deprecated Thrift interface. Amongst
 +      other things, this implies the removal of all yaml options related to thrift
 +      ('start_rpc', rpc_port, ...).
 +    - Cassandra 4.0 removed support for any pre-3.0 format. This means you
 +      cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade to
 +      a 3.0.x/3.x version first (and run upgradesstable). In particular, this
 +      mean Cassandra 4.0 cannot load or read pre-3.0 sstables in any way: you
 +      will need to upgrade those sstable in 3.0.x/3.x first.
 +    - Upgrades from 3.0.x or 3.x are supported since 3.0.13 or 3.11.0, previous
 +      versions will causes issues during rolling upgrades (CASSANDRA-13274).
 +    - Cassandra will no longer allow invalid keyspace replication options, such
 +      as invalid datacenter names for NetworkTopologyStrategy. Operators MUST
 +      add new nodes to a datacenter before they can set set ALTER or CREATE
 +      keyspace replication policies using that datacenter. Existing keyspaces
 +      will continue to operate, but CREATE and ALTER will validate that all
 +      datacenters specified exist in the cluster.
 +    - Cassandra 4.0 fixes a problem with incremental repair which caused repaired
 +      data to be inconsistent between nodes. The fix changes the behavior of both
 +      full and incremental repairs. For full repairs, data is no longer marked
 +      repaired. For incremental repairs, anticompaction is run at the beginning
 +      of the repair, instead of at the end. If incremental repair was being used
 +      prior to upgrading, a full repair should be run after upgrading to resolve
 +      any inconsistencies.
 +    - Config option index_interval has been removed (it was deprecated since 2.0)
 +    - Deprecated repair JMX APIs are removed.
 +    - The version of snappy-java has been upgraded to 1.1.2.6
 +	- the miniumum value for internode message timeouts is 10ms. Previously, any
 +	  positive value was allowed. See cassandra.yaml entries like
 +	  read_request_timeout_in_ms for more details.
 +	- Cassandra 4.0 allows a single port to be used for both secure and insecure
 +	  connections between cassandra nodes (CASSANDRA-10404). See the yaml for
 +	  specific property changes, and see the security doc for full details.
 +    - Due to the parallelization of the initial build of materialized views,
 +      the per token range view building status is stored in the new table
 +      `system.view_builds_in_progress`. The old table `system.views_builds_in_progress`
 +      is no longer used and can be removed. See CASSANDRA-12245 for more details.
 +	- Config option commitlog_sync_batch_window_in_ms has been deprecated as it's
 +	  documentation has been incorrect and the setting itself near useless.
 +	  Batch mode remains a valid commit log mode, however.
 +	- There is a new commit log mode, group, which is similar to batch mode
 +	  but blocks for up to a configurable number of milliseconds between disk flushes.
 +	- nodetool clearsnapshot now required the --all flag to remove all snapshots.
 +	  Previous behavior would delete all snapshots by default.
 +    - Nodes are now identified by a combination of IP, and storage port.
 +      Existing JMX APIs, nodetool, and system tables continue to work
 +      and accept/return just an IP, but there is a new
 +      version of each that works with the full unambiguous identifier.
 +      You should prefer these over the deprecated ambiguous versions that only
 +      work with an IP. This was done to support multiple instances per IP.
 +      Additionally we are moving to only using a single port for encrypted and
 +      unencrypted traffic and if you want multiple instances per IP you must
 +      first switch encrypted traffic to the storage port and not a separate
 +      encrypted port. If you want to use multiple instances per IP
 +      with SSL you will need to use StartTLS on storage_port and set
 +      outgoing_encrypted_port_source to gossip outbound connections
 +      know what port to connect to for each instance. Before changing
 +      storage port or native port at nodes you must first upgrade the entire cluster
 +      and clients to 4.0 so they can handle the port not being consistent across
 +      the cluster.
 +    - Names of AWS regions/availability zones have been cleaned up to more correctly
 +      match the Amazon names. There is now a new option in conf/cassandra-rackdc.properties
 +      that lets users enable the correct names for new clusters, or use the legacy
 +      names for existing clusters. See conf/cassandra-rackdc.properties for details.
 +    - Background repair has been removed. dclocal_read_repair_chance and
 +      read_repair_chance table options have been removed and are now rejected.
 +      See CASSANDRA-13910 for details.
 +    - Internode TCP connections that do not ack segments for 30s will now
 +      be automatically detected and closed via the Linux TCP_USER_TIMEOUT
 +      socket option. This should be exceedingly rare, but AWS networks (and
 +      other stateful firewalls) apparently suffer from this issue. You can
 +      tune the timeouts on TCP connection and segment ack via the
 +      `cassandra.yaml:internode_tcp_connect_timeout_in_ms` and
 +      `cassandra.yaml:internode_tcp_user_timeout_in_ms` options respectively.
 +      See CASSANDRA-14358 for details.
++	- repair_session_space_in_mb setting has been added to cassandra.yaml to allow operators to reduce
++	  merkle tree size if repair is creating too much heap pressure. The repair_session_max_tree_depth
++	  setting added in 3.0.19 and 3.11.5 is deprecated in favor of this setting. See CASSANDRA-14096 
 +
 +Materialized Views
 +-------------------
 +   - Following a discussion regarding concerns about the design and safety of Materialized Views, the C* development
 +     community no longer recommends them for production use, and considers them experimental. Warnings messages will
 +     now be logged when they are created. (See https://www.mail-archive.com/dev@cassandra.apache.org/msg11511.html)
 +   - An 'enable_materialized_views' flag has been added to cassandra.yaml to allow operators to prevent creation of
 +     views
 +   - CREATE MATERIALIZED VIEW syntax has become stricter. Partition key columns are no longer implicitly considered
 +     to be NOT NULL, and no base primary key columns get automatically included in view definition. You have to
 +     specify them explicitly now.
  
  3.11.4
  ======
diff --cc conf/cassandra.yaml
index dde4296,a263d8a..78fb162
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -506,6 -498,19 +506,17 @@@ concurrent_materialized_view_writes: 3
  #    off heap objects
  memtable_allocation_type: heap_buffers
  
 -# Limits the maximum Merkle tree depth to avoid consuming too much
 -# memory during repairs.
 -#
 -# The default setting of 18 generates trees of maximum size around
 -# 50 MiB / tree. If you are running out of memory during repairs consider
 -# lowering this to 15 (~6 MiB / tree) or lower, but try not to lower it
 -# too much past that or you will lose too much resolution and stream
 -# too much redundant data during repair. Cannot be set lower than 10.
++# Limit memory usage for Merkle tree calculations during repairs. The default
++# is 1/16th of the available heap. The main tradeoff is that smaller trees
++# have less resolution, which can lead to over-streaming data. If you see heap
++# pressure during repairs, consider lowering this, but you cannot go below
++# one megabyte. If you see lots of over-streaming, consider raising
++# this or using subrange repair.
+ #
+ # For more details see https://issues.apache.org/jira/browse/CASSANDRA-14096.
+ #
 -# repair_session_max_tree_depth: 18
++# repair_session_space_in_mb:
+ 
  # Total space to use for commit logs on disk.
  #
  # If space gets above this value, Cassandra will flush every dirty CF
diff --cc src/java/org/apache/cassandra/config/Config.java
index 4a55b2e,528cf4f..a95db23
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -123,6 -123,9 +123,11 @@@ public class Confi
      public Integer memtable_offheap_space_in_mb;
      public Float memtable_cleanup_threshold = null;
  
+     // Limit the maximum depth of repair session merkle trees
 -    public volatile int repair_session_max_tree_depth = 18;
++    @Deprecated
++    public volatile Integer repair_session_max_tree_depth = null;
++    public volatile Integer repair_session_space_in_mb = null;
+ 
      public int storage_port = 7000;
      public int ssl_storage_port = 7001;
      public String listen_address;
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5c70c9a,069a17e..3f80d71
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -461,7 -437,13 +461,28 @@@ public class DatabaseDescripto
          else
              logger.info("Global memtable off-heap threshold is enabled at {}MB", conf.memtable_offheap_space_in_mb);
  
 -        if (conf.repair_session_max_tree_depth < 10)
 -            throw new ConfigurationException("repair_session_max_tree_depth should not be < 10, but was " + conf.repair_session_max_tree_depth);
 -        if (conf.repair_session_max_tree_depth > 20)
 -            logger.warn("repair_session_max_tree_depth of " + conf.repair_session_max_tree_depth + " > 20 could lead to excessive memory usage");
++        if (conf.repair_session_max_tree_depth != null)
++        {
++            logger.warn("repair_session_max_tree_depth has been deprecated and should be removed from cassandra.yaml. Use repair_session_space_in_mb instead");
++            if (conf.repair_session_max_tree_depth < 10)
++                throw new ConfigurationException("repair_session_max_tree_depth should not be < 10, but was " + conf.repair_session_max_tree_depth);
++            if (conf.repair_session_max_tree_depth > 20)
++                logger.warn("repair_session_max_tree_depth of " + conf.repair_session_max_tree_depth + " > 20 could lead to excessive memory usage");
++        }
++        else
++        {
++            conf.repair_session_max_tree_depth = 20;
++        }
++
++        if (conf.repair_session_space_in_mb == null)
++            conf.repair_session_space_in_mb = Math.max(1, (int) (Runtime.getRuntime().maxMemory() / (16 * 1048576)));
++
++        if (conf.repair_session_space_in_mb < 1)
++            throw new ConfigurationException("repair_session_space_in_mb must be > 0, but was " + conf.repair_session_space_in_mb);
++        else if (conf.repair_session_space_in_mb > (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)))
++            logger.warn("A repair_session_space_in_mb of " + conf.repair_session_space_in_mb + " megabytes is likely to cause heap pressure");
+ 
 -        if (conf.thrift_framed_transport_size_in_mb <= 0)
 -            throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive, but was " + conf.thrift_framed_transport_size_in_mb, false);
 +        checkForLowestAcceptedTimeouts(conf);
  
          if (conf.native_transport_max_frame_size_in_mb <= 0)
              throw new ConfigurationException("native_transport_max_frame_size_in_mb must be positive, but was " + conf.native_transport_max_frame_size_in_mb, false);
@@@ -2382,11 -2280,27 +2403,44 @@@
          return conf.memtable_allocation_type;
      }
  
 -    public static Float getMemtableCleanupThreshold()
 -    {
 -        return conf.memtable_cleanup_threshold;
 -    }
 -
+     public static int getRepairSessionMaxTreeDepth()
+     {
+         return conf.repair_session_max_tree_depth;
+     }
+ 
+     public static void setRepairSessionMaxTreeDepth(int depth)
+     {
+         if (depth < 10)
+             throw new ConfigurationException("Cannot set repair_session_max_tree_depth to " + depth +
+                                              " which is < 10, doing nothing");
+         else if (depth > 20)
+             logger.warn("repair_session_max_tree_depth of " + depth + " > 20 could lead to excessive memory usage");
+ 
+         conf.repair_session_max_tree_depth = depth;
+     }
+ 
++    public static int getRepairSessionSpaceInMegabytes()
++    {
++        return conf.repair_session_space_in_mb;
++    }
++
++    public static void setRepairSessionSpaceInMegabytes(int sizeInMegabytes)
++    {
++        if (sizeInMegabytes < 1)
++            throw new ConfigurationException("Cannot set repair_session_space_in_mb to " + sizeInMegabytes +
++                                             " < 1 megabyte");
++        else if (sizeInMegabytes > (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)))
++            logger.warn("A repair_session_space_in_mb of " + conf.repair_session_space_in_mb +
++                        " megabytes is likely to cause heap pressure.");
++
++        conf.repair_session_space_in_mb = sizeInMegabytes;
++    }
++
 +    public static Float getMemtableCleanupThreshold()
 +    {
 +        return conf.memtable_cleanup_threshold;
 +    }
 +
      public static int getIndexSummaryResizeIntervalInMinutes()
      {
          return conf.index_summary_resize_interval_in_minutes;
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index ebeb1f9,6f89a86..a67aac0
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -17,13 -17,10 +17,14 @@@
   */
  package org.apache.cassandra.repair;
  
 -import java.net.InetAddress;
  import java.util.*;
 +import java.util.function.Predicate;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
  
+ import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.ImmutableMap;
  import com.google.common.util.concurrent.*;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -239,96 -171,6 +240,97 @@@ public class RepairJob extends Abstract
          return syncTasks;
      }
  
 +    private ListenableFuture<List<SyncStat>> optimisedSyncing(List<TreeResponse> trees)
 +    {
 +        List<SyncTask> syncTasks = createOptimisedSyncingSyncTasks(desc,
 +                                                                   trees,
 +                                                                   FBUtilities.getLocalAddressAndPort(),
 +                                                                   this::isTransient,
 +                                                                   this::getDC,
 +                                                                   session.isIncremental,
 +                                                                   session.previewKind);
 +
 +        return executeTasks(syncTasks);
 +    }
 +
-     private ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> syncTasks)
++    @VisibleForTesting
++    ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> syncTasks)
 +    {
 +        for (SyncTask task : syncTasks)
 +        {
 +            if (!task.isLocal())
 +                session.trackSyncCompletion(Pair.create(desc, task.nodePair()), (CompletableRemoteSyncTask) task);
 +            taskExecutor.submit(task);
 +        }
 +
 +        return Futures.allAsList(syncTasks);
 +    }
 +
 +    static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc,
 +                                                          List<TreeResponse> trees,
 +                                                          InetAddressAndPort local,
 +                                                          Predicate<InetAddressAndPort> isTransient,
 +                                                          Function<InetAddressAndPort, String> getDC,
 +                                                          boolean isIncremental,
 +                                                          PreviewKind previewKind)
 +    {
 +        List<SyncTask> syncTasks = new ArrayList<>();
 +        // We need to difference all trees one against another
 +        DifferenceHolder diffHolder = new DifferenceHolder(trees);
 +
 +        logger.debug("diffs = {}", diffHolder);
 +        PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
 +                                                candidates.stream()
 +                                                          .filter(node -> getDC.apply(streaming)
 +                                                                          .equals(getDC.apply(node)))
 +                                                          .collect(Collectors.toSet());
 +        ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
 +
 +        for (int i = 0; i < trees.size(); i++)
 +        {
 +            InetAddressAndPort address = trees.get(i).endpoint;
 +
 +            // we don't stream to transient replicas
 +            if (isTransient.test(address))
 +                continue;
 +
 +            HostDifferences streamsFor = reducedDifferences.get(address);
 +            if (streamsFor != null)
 +            {
 +                Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not fetch ranges from ourselves");
 +                for (InetAddressAndPort fetchFrom : streamsFor.hosts())
 +                {
 +                    List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
 +                    assert !toFetch.isEmpty();
 +
 +                    logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
 +                    SyncTask task;
 +                    if (address.equals(local))
 +                    {
 +                        task = new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null,
 +                                                 true, false, previewKind);
 +                    }
 +                    else
 +                    {
 +                        task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind);
 +                    }
 +                    syncTasks.add(task);
 +
 +                }
 +            }
 +            else
 +            {
 +                logger.debug("Node {} has nothing to stream", address);
 +            }
 +        }
 +        return syncTasks;
 +    }
 +
 +    private String getDC(InetAddressAndPort address)
 +    {
 +        return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address);
 +    }
 +
      /**
       * Creates {@link ValidationTask} and submit them to task executor in parallel.
       *
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index a38205e,3d25cbf..40f3dbe
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -100,13 -96,12 +101,13 @@@ public class RepairSession extends Abst
      private final AtomicBoolean isFailed = new AtomicBoolean(false);
  
      // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
 -    private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, ValidationTask> validating = new ConcurrentHashMap<>();
 +    private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<>();
      // Remote syncing jobs wait response in syncingTasks map
 -    private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
 +    private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
  
      // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
-     public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
+     public final ListeningExecutorService taskExecutor;
 +    public final boolean optimiseStreams;
  
      private volatile boolean terminated = false;
  
@@@ -140,38 -135,17 +141,44 @@@
          this.parallelismDegree = parallelismDegree;
          this.keyspace = keyspace;
          this.cfnames = cfnames;
 -        this.ranges = ranges;
 -        this.endpoints = endpoints;
 -        this.repairedAt = repairedAt;
 +
 +        //If force then filter out dead endpoints
 +        boolean forceSkippedReplicas = false;
 +        if (force)
 +        {
 +            logger.debug("force flag set, removing dead endpoints");
 +            final Set<InetAddressAndPort> removeCandidates = new HashSet<>();
 +            for (final InetAddressAndPort endpoint : commonRange.endpoints)
 +            {
 +                if (!FailureDetector.instance.isAlive(endpoint))
 +                {
 +                    logger.info("Removing a dead node from Repair due to -force {}", endpoint);
 +                    removeCandidates.add(endpoint);
 +                }
 +            }
 +            if (!removeCandidates.isEmpty())
 +            {
 +                // we shouldn't be recording a successful repair if
 +                // any replicas are excluded from the repair
 +                forceSkippedReplicas = true;
 +                Set<InetAddressAndPort> filteredEndpoints = new HashSet<>(commonRange.endpoints);
 +                filteredEndpoints.removeAll(removeCandidates);
 +                commonRange = new CommonRange(filteredEndpoints, commonRange.transEndpoints, commonRange.ranges);
 +            }
 +        }
 +
 +        this.commonRange = commonRange;
 +        this.isIncremental = isIncremental;
 +        this.previewKind = previewKind;
          this.pullRepair = pullRepair;
 +        this.skippedReplicas = forceSkippedReplicas;
 +        this.optimiseStreams = optimiseStreams;
+         this.taskExecutor = MoreExecutors.listeningDecorator(createExecutor());
+     }
+ 
 -    @VisibleForTesting
+     protected DebuggableThreadPoolExecutor createExecutor()
+     {
+         return DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask");
      }
  
      public UUID getId()
@@@ -228,20 -197,25 +235,26 @@@
       * @param nodes nodes that completed sync
       * @param success true if sync succeeded
       */
 -    public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success)
 +    public void syncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
      {
-         CompletableRemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
 -        RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
++        CompletableRemoteSyncTask task = syncingTasks.remove(Pair.create(desc, nodes));
          if (task == null)
          {
              assert terminated;
              return;
          }
  
 -        logger.debug("[repair #{}] Repair completed between {} and {} on {}", getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily);
 -        task.syncComplete(success);
 +        if (logger.isDebugEnabled())
 +            logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.coordinator, nodes.peer, desc.columnFamily);
 +        task.syncComplete(success, summaries);
      }
  
+     @VisibleForTesting
 -    Map<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> getSyncingTasks()
++    Map<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> getSyncingTasks()
+     {
+         return Collections.unmodifiableMap(syncingTasks);
+     }
+ 
      private String repairedNodes()
      {
          StringBuilder sb = new StringBuilder();
diff --cc src/java/org/apache/cassandra/repair/ValidationManager.java
index d664c8a,0000000..bbd5219
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/ValidationManager.java
+++ b/src/java/org/apache/cassandra/repair/ValidationManager.java
@@@ -1,163 -1,0 +1,176 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.repair;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Map;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.utils.FBUtilities;
++import org.apache.cassandra.utils.MerkleTree;
 +import org.apache.cassandra.utils.MerkleTrees;
 +
 +public class ValidationManager
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(ValidationManager.class);
 +
 +    public static final ValidationManager instance = new ValidationManager();
 +
 +    private ValidationManager() {}
 +
 +    private static MerkleTrees createMerkleTrees(ValidationPartitionIterator validationIterator, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
 +    {
 +        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
 +        long allPartitions = validationIterator.estimatedPartitions();
 +        Map<Range<Token>, Long> rangePartitionCounts = validationIterator.getRangePartitionCounts();
 +
++        // The repair coordinator must hold RF trees in memory at once, so a given validation compaction can only
++        // use 1 / RF of the allowed space.
++        long availableBytes = (DatabaseDescriptor.getRepairSessionSpaceInMegabytes() * 1048576) /
++                              cfs.keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
++
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = rangePartitionCounts.get(range);
 +            double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
 +            // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress,
-             // capping at 20 to prevent large tree (CASSANDRA-11390)
-             int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
++            // capping at a depth that does not exceed our memory budget (CASSANDRA-11390, CASSANDRA-14096)
++            int rangeAvailableBytes = Math.max(1, (int) (rangeOwningRatio * availableBytes));
++            // Try to estimate max tree depth that fits the space budget assuming hashes of 256 bits = 32 bytes
++            // note that estimatedMaxDepthForBytes cannot return a number lower than 1
++            int estimatedMaxDepth = MerkleTree.estimatedMaxDepthForBytes(cfs.getPartitioner(), rangeAvailableBytes, 32);
++            int maxDepth = rangeOwningRatio > 0
++                           ? Math.min(estimatedMaxDepth, DatabaseDescriptor.getRepairSessionMaxTreeDepth())
++                           : 0;
 +            // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263)
 +            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
 +            tree.addMerkleTree((int) Math.pow(2, depth), range);
 +        }
 +        if (logger.isDebugEnabled())
 +        {
 +            // MT serialize may take time
 +            logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0));
 +        }
 +
 +        return tree;
 +    }
 +
 +    private static ValidationPartitionIterator getValidationIterator(TableRepairManager repairManager, Validator validator) throws IOException
 +    {
 +        RepairJobDesc desc = validator.desc;
 +        return repairManager.getValidationIterator(desc.ranges, desc.parentSessionId, desc.sessionId, validator.isIncremental, validator.nowInSec);
 +    }
 +
 +    /**
 +     * Performs a readonly "compaction" of all sstables in order to validate complete rows,
 +     * but without writing the merge result
 +     */
 +    @SuppressWarnings("resource")
 +    private void doValidation(ColumnFamilyStore cfs, Validator validator) throws IOException
 +    {
 +        // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped
 +        // mid-validation, or to attempt to validate a droped CFS.  this is just a best effort to avoid useless work,
 +        // particularly in the scenario where a validation is submitted before the drop, and there are compactions
 +        // started prior to the drop keeping some sstables alive.  Since validationCompaction can run
 +        // concurrently with other compactions, it would otherwise go ahead and scan those again.
 +        if (!cfs.isValid())
 +            return;
 +
 +        // Create Merkle trees suitable to hold estimated partitions for the given ranges.
 +        // We blindly assume that a partition is evenly distributed on all sstables for now.
 +        long start = System.nanoTime();
 +        long partitionCount = 0;
 +        long estimatedTotalBytes = 0;
 +        try (ValidationPartitionIterator vi = getValidationIterator(cfs.getRepairManager(), validator))
 +        {
 +            MerkleTrees tree = createMerkleTrees(vi, validator.desc.ranges, cfs);
 +            try
 +            {
 +                // validate the CF as we iterate over it
 +                validator.prepare(cfs, tree);
 +                while (vi.hasNext())
 +                {
 +                    try (UnfilteredRowIterator partition = vi.next())
 +                    {
 +                        validator.add(partition);
 +                        partitionCount++;
 +                    }
 +                }
 +                validator.complete();
 +            }
 +            finally
 +            {
 +                estimatedTotalBytes = vi.getEstimatedBytes();
 +                partitionCount = vi.estimatedPartitions();
 +            }
 +        }
 +        finally
 +        {
 +            cfs.metric.bytesValidated.update(estimatedTotalBytes);
 +            cfs.metric.partitionsValidated.update(partitionCount);
 +        }
 +        if (logger.isDebugEnabled())
 +        {
 +            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
 +            logger.debug("Validation of {} partitions (~{}) finished in {} msec, for {}",
 +                         partitionCount,
 +                         FBUtilities.prettyPrintMemory(estimatedTotalBytes),
 +                         duration,
 +                         validator.desc);
 +        }
 +    }
 +
 +    /**
 +     * Does not mutate data, so is not scheduled.
 +     */
 +    public Future<?> submitValidation(ColumnFamilyStore cfs, Validator validator)
 +    {
 +        Callable<Object> validation = new Callable<Object>()
 +        {
 +            public Object call() throws IOException
 +            {
 +                try (TableMetrics.TableTimer.Context c = cfs.metric.validationTime.time())
 +                {
 +                    doValidation(cfs, validator);
 +                }
 +                catch (Throwable e)
 +                {
 +                    // we need to inform the remote end of our failure, otherwise it will hang on repair forever
 +                    validator.fail();
 +                    throw e;
 +                }
 +                return this;
 +            }
 +        };
 +
 +        return cfs.getRepairManager().submitValidation(validation);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 1a54e75,626aa91..525beba
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -156,40 -121,8 +156,52 @@@ public class ActiveRepairService implem
      {
          this.failureDetector = failureDetector;
          this.gossiper = gossiper;
 +        this.repairStatusByCmd = CacheBuilder.newBuilder()
 +                                             .expireAfterWrite(
 +                                             Long.getLong("cassandra.parent_repair_status_expiry_seconds",
 +                                                          TimeUnit.SECONDS.convert(1, TimeUnit.DAYS)), TimeUnit.SECONDS)
 +                                             // using weight wouldn't work so well, since it doesn't reflect mutation of cached data
 +                                             // see https://github.com/google/guava/wiki/CachesExplained
 +                                             // We assume each entry is unlikely to be much more than 100 bytes, so bounding the size should be sufficient.
 +                                             .maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000))
 +                                             .build();
 +
 +        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 +    }
 +
 +    public void start()
 +    {
 +        consistent.local.start();
 +        ScheduledExecutors.optionalTasks.scheduleAtFixedRate(consistent.local::cleanup, 0,
 +                                                             LocalSessions.CLEANUP_INTERVAL,
 +                                                             TimeUnit.SECONDS);
 +    }
 +
 +    @Override
 +    public List<Map<String, String>> getSessions(boolean all)
 +    {
 +        return consistent.local.sessionInfo(all);
 +    }
 +
 +    @Override
 +    public void failSession(String session, boolean force)
 +    {
 +        UUID sessionID = UUID.fromString(session);
 +        consistent.local.cancelSession(sessionID, force);
 +    }
 +
++    @Override
++    public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes)
++    {
++        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(sizeInMegabytes);
++    }
++
++    @Override
++    public int getRepairSessionSpaceInMegabytes()
++    {
++        return DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
+     }
+ 
      /**
       * Requests repairs for the given keyspace and column families.
       *
diff --cc src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index 53b0acb,0000000..283d466
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@@ -1,30 -1,0 +1,33 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service;
 +
 +import java.util.List;
 +import java.util.Map;
 +
 +public interface ActiveRepairServiceMBean
 +{
 +    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=RepairService";
 +
 +    public List<Map<String, String>> getSessions(boolean all);
 +    public void failSession(String session, boolean force);
++
++    public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes);
++    public int getRepairSessionSpaceInMegabytes();
 +}
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index ca453d0,8f4b1e7..6a57493
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -3868,15 -3688,16 +3868,24 @@@ public class StorageService extends Not
          ActiveRepairService.instance.terminateSessions();
      }
  
- 
 +    @Nullable
 +    public List<String> getParentRepairStatus(int cmd)
 +    {
 +        Pair<ActiveRepairService.ParentRepairStatus, List<String>> pair = ActiveRepairService.instance.getRepairStatus(cmd);
 +        return pair == null ? null :
 +               ImmutableList.<String>builder().add(pair.left.name()).addAll(pair.right).build();
 +    }
 +
+     public void setRepairSessionMaxTreeDepth(int depth)
+     {
+         DatabaseDescriptor.setRepairSessionMaxTreeDepth(depth);
+     }
+ 
+     public int getRepairSessionMaxTreeDepth()
+     {
+         return DatabaseDescriptor.getRepairSessionMaxTreeDepth();
+     }
+ 
      /* End of MBean interface methods */
  
      /**
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index e82d1ba,bcf55cf..e74f002
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -357,22 -330,65 +357,26 @@@ public interface StorageServiceMBean ex
       */
      public int repairAsync(String keyspace, Map<String, String> options);
  
 -    /**
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
 -     */
 -    @Deprecated
 -    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts,  boolean primaryRange, boolean fullRepair, String... tableNames) throws IOException;
 -
 -    /**
 -     * Invoke repair asynchronously.
 -     * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
 -     * Notification format is:
 -     *   type: "repair"
 -     *   userObject: int array of length 2, [0]=command number, [1]=ordinal of ActiveRepairService.Status
 -     *
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
 -     *
 -     * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
 -     * @return Repair command number, or 0 if nothing to repair
 -     */
 -    @Deprecated
 -    public int forceRepairAsync(String keyspace, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... tableNames);
 -
 -    /**
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
 -     */
 -    @Deprecated
 -    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... tableNames) throws IOException;
 -
 -    /**
 -     * Same as forceRepairAsync, but handles a specified range
 -     *
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
 -     *
 -     * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
 -     */
 -    @Deprecated
 -    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... tableNames);
 -
 -    /**
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
 -     */
 -    @Deprecated
 -    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... tableNames);
 -
 -    /**
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
 -     */
 -    @Deprecated
 -    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... tableNames);
 -
      public void forceTerminateAllRepairSessions();
  
+     public void setRepairSessionMaxTreeDepth(int depth);
+ 
+     public int getRepairSessionMaxTreeDepth();
+ 
      /**
 +     * Get the status of a given parent repair session.
 +     * @param cmd the int reference returned when issuing the repair
 +     * @return status of parent repair from enum {@link org.apache.cassandra.repair.RepairRunnable.Status}
 +     * followed by final message or messages of the session
 +     */
 +    @Nullable
 +    public List<String> getParentRepairStatus(int cmd);
 +
 +    /**
       * transfer this node's data to other machines and remove it from service.
 +     * @param force Decommission even if this will reduce N to be less than RF.
       */
 -    public void decommission() throws InterruptedException;
 +    public void decommission(boolean force) throws InterruptedException;
  
      /**
       * @param newToken token to move this node to.
diff --cc src/java/org/apache/cassandra/utils/MerkleTree.java
index 143d839,9572a27..1d51f03
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@@ -1155,4 -1190,4 +1155,50 @@@ public class MerkleTree implements Seri
              public TooDeep(){ super(); }
          }
      }
++
++    /**
++     * Estimate the allowable depth while keeping the resulting heap usage of this tree under the provided
++     * number of bytes. This is important for ensuring that we do not allocate overly large trees that could
++     * OOM the JVM and cause instability.
++     *
++     * Calculated using the following logic:
++     *
++     * Let T = size of a tree of depth n
++     *
++     * T = #leafs  * sizeof(leaf) + #inner  * sizeof(inner)
++     * T = 2^n     * L            + 2^n - 1 * I
++     *
++     * T = 2^n * L + 2^n * I - I;
++     *
++     * So to solve for n given sizeof(tree_n) T:
++     *
++     * n = floor(log_2((T + I) / (L + I))
++     *
++     * @param numBytes: The number of bytes to fit the tree within
++     * @param bytesPerHash: The number of bytes stored in a leaf node, for example 2 * murmur128 will be 256 bits
++     *                    or 32 bytes
++     * @return the estimated depth that will fit within the provided number of bytes
++     */
++    public static int estimatedMaxDepthForBytes(IPartitioner partitioner, long numBytes, int bytesPerHash)
++    {
++        byte[] hashLeft = new byte[bytesPerHash];
++        byte[] hashRigth = new byte[bytesPerHash];
++        Leaf left = new Leaf(hashLeft);
++        Leaf right = new Leaf(hashRigth);
++        Inner inner = new Inner(partitioner.getMinimumToken(), left, right);
++        inner.calc();
++
++        // Some partioners have variable token sizes, try to estimate as close as we can by using the same
++        // heap estimate as the memtables use.
++        long innerTokenSize = ObjectSizes.measureDeep(partitioner.getMinimumToken());
++        long realInnerTokenSize = partitioner.getMinimumToken().getHeapSize();
++
++        long sizeOfLeaf = ObjectSizes.measureDeep(left);
++        long sizeOfInner = ObjectSizes.measureDeep(inner) -
++                           (ObjectSizes.measureDeep(left) + ObjectSizes.measureDeep(right) + innerTokenSize) +
++                           realInnerTokenSize;
++
++        long adjustedBytes = Math.max(1, (numBytes + sizeOfInner) / (sizeOfLeaf + sizeOfInner));
++        return Math.max(1, (int) Math.floor(Math.log(adjustedBytes) / Math.log(2)));
++    }
  }
diff --cc test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 3d88164,4788289..209c35d
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@@ -29,13 -31,23 +30,14 @@@ import org.junit.Assert
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
 +
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
 -import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.exceptions.InvalidRequestException;
 -import org.apache.cassandra.gms.Gossiper;
 -import org.apache.cassandra.schema.KeyspaceMetadata;
 -import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.service.MigrationManager;
 -import org.apache.cassandra.thrift.ThriftConversion;
  
  import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertNotNull;
 -import static org.junit.Assert.assertNull;
 -import static org.junit.Assert.fail;
 -
  import static org.junit.Assert.assertTrue;
++import static org.junit.Assert.fail;
  
  @RunWith(OrderedJUnit4ClassRunner.class)
  public class DatabaseDescriptorTest
@@@ -208,46 -284,41 +210,113 @@@
          assertTrue(DatabaseDescriptor.tokensFromString(null).isEmpty());
          Collection<String> tokens = DatabaseDescriptor.tokensFromString(" a,b ,c , d, f,g,h");
          assertEquals(7, tokens.size());
 -        assertTrue(tokens.containsAll(Arrays.asList(new String[]{ "a", "b", "c", "d", "f", "g", "h" })));
 +        assertTrue(tokens.containsAll(Arrays.asList(new String[]{"a", "b", "c", "d", "f", "g", "h"})));
 +    }
 +
 +    @Test
 +    public void testLowestAcceptableTimeouts() throws ConfigurationException
 +    {
 +        Config testConfig = new Config();
 +        testConfig.read_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.range_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.write_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.truncate_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.cas_contention_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.counter_write_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        
 +        assertTrue(testConfig.read_request_timeout_in_ms > DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.range_request_timeout_in_ms > DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.write_request_timeout_in_ms > DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.truncate_request_timeout_in_ms > DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.cas_contention_timeout_in_ms > DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.counter_write_request_timeout_in_ms > DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.request_timeout_in_ms > DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +
 +        //set less than Lowest acceptable value
 +        testConfig.read_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.range_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.write_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.truncate_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.cas_contention_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.counter_write_request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.request_timeout_in_ms = DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +
 +        DatabaseDescriptor.checkForLowestAcceptedTimeouts(testConfig);
 +
 +        assertTrue(testConfig.read_request_timeout_in_ms == DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.range_request_timeout_in_ms == DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.write_request_timeout_in_ms == DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.truncate_request_timeout_in_ms == DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.cas_contention_timeout_in_ms == DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.counter_write_request_timeout_in_ms == DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.request_timeout_in_ms == DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +    }
++
++    @Test
++    public void testRepairSessionMemorySizeToggles()
++    {
++        int previousSize = DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
++        try
++        {
++            Assert.assertEquals((Runtime.getRuntime().maxMemory() / (1024 * 1024) / 16),
++                                DatabaseDescriptor.getRepairSessionSpaceInMegabytes());
++
++            int targetSize = (int) (Runtime.getRuntime().maxMemory() / (1024 * 1024) / 4) + 1;
++
++            DatabaseDescriptor.setRepairSessionSpaceInMegabytes(targetSize);
++            Assert.assertEquals(targetSize, DatabaseDescriptor.getRepairSessionSpaceInMegabytes());
++
++            DatabaseDescriptor.setRepairSessionSpaceInMegabytes(10);
++            Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionSpaceInMegabytes());
++
++            try
++            {
++                DatabaseDescriptor.setRepairSessionSpaceInMegabytes(0);
++                fail("Should have received a ConfigurationException for depth of 9");
++            }
++            catch (ConfigurationException ignored) { }
++
++            Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionSpaceInMegabytes());
++        }
++        finally
++        {
++            DatabaseDescriptor.setRepairSessionSpaceInMegabytes(previousSize);
++        }
+     }
+ 
+     @Test
+     public void testRepairSessionSizeToggles()
+     {
+         int previousDepth = DatabaseDescriptor.getRepairSessionMaxTreeDepth();
+         try
+         {
 -            Assert.assertEquals(18, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
++            Assert.assertEquals(20, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(10);
+             Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             try
+             {
+                 DatabaseDescriptor.setRepairSessionMaxTreeDepth(9);
+                 fail("Should have received a ConfigurationException for depth of 9");
+             }
+             catch (ConfigurationException ignored) { }
+             Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             try
+             {
+                 DatabaseDescriptor.setRepairSessionMaxTreeDepth(-20);
+                 fail("Should have received a ConfigurationException for depth of -20");
+             }
+             catch (ConfigurationException ignored) { }
+             Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(22);
+             Assert.assertEquals(22, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+         }
+         finally
+         {
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(previousDepth);
+         }
+     }
  }
diff --cc test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 263ecbc,e1dd5b3..6db29dc
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@@ -18,608 -18,309 +18,823 @@@
  
  package org.apache.cassandra.repair;
  
 -import java.net.InetAddress;
  import java.net.UnknownHostException;
+ import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collection;
+ import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
  import java.util.UUID;
++import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
 +import java.util.function.Predicate;
+ import java.util.stream.Collectors;
  
 -import com.google.common.util.concurrent.AsyncFunction;
 -import com.google.common.util.concurrent.Futures;
 +import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.ListenableFuture;
  import org.junit.After;
 +import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
  import org.junit.Test;
  
- import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 -import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
  import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.IMessageSink;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessageOut;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.repair.messages.RepairMessage;
+ import org.apache.cassandra.repair.messages.SyncRequest;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MerkleTree;
  import org.apache.cassandra.utils.MerkleTrees;
+ import org.apache.cassandra.utils.ObjectSizes;
+ import org.apache.cassandra.utils.UUIDGen;
  
  import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNull;
  import static org.junit.Assert.assertTrue;
  
 -public class RepairJobTest extends SchemaLoader
 +public class RepairJobTest
  {
+     private static final long TEST_TIMEOUT_S = 10;
+     private static final long THREAD_TIMEOUT_MILLIS = 100;
 +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+     private static final IPartitioner MURMUR3_PARTITIONER = Murmur3Partitioner.instance;
+     private static final String KEYSPACE = "RepairJobTest";
+     private static final String CF = "Standard1";
+     private static final Object messageLock = new Object();
+ 
++    private static final Range<Token> range1 = range(0, 1);
++    private static final Range<Token> range2 = range(2, 3);
++    private static final Range<Token> range3 = range(4, 5);
++    private static final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE, CF, Arrays.asList());
+     private static final List<Range<Token>> fullRange = Collections.singletonList(new Range<>(MURMUR3_PARTITIONER.getMinimumToken(),
 -                                                                                              MURMUR3_PARTITIONER.getRandomToken()));
 -    private static InetAddress addr1;
 -    private static InetAddress addr2;
 -    private static InetAddress addr3;
 -    private static InetAddress addr4;
++                                                                                              MURMUR3_PARTITIONER.getMaximumToken()));
++    private static InetAddressAndPort addr1;
++    private static InetAddressAndPort addr2;
++    private static InetAddressAndPort addr3;
++    private static InetAddressAndPort addr4;
++    private static InetAddressAndPort addr5;
+     private RepairSession session;
+     private RepairJob job;
+     private RepairJobDesc sessionJobDesc;
+ 
+     // So that threads actually get recycled and we can have accurate memory accounting while testing
+     // memory retention from CASSANDRA-14096
+     private static class MeasureableRepairSession extends RepairSession
+     {
 -        public MeasureableRepairSession(UUID parentRepairSession, UUID id, Collection<Range<Token>> ranges,
 -                                        String keyspace,RepairParallelism parallelismDegree, Set<InetAddress> endpoints,
 -                                        long repairedAt, boolean pullRepair, String... cfnames)
++        public MeasureableRepairSession(UUID parentRepairSession, UUID id, CommonRange commonRange, String keyspace,
++                                        RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair,
++                                        boolean force, PreviewKind previewKind, boolean optimiseStreams, String... cfnames)
+         {
 -            super(parentRepairSession, id, ranges, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames);
++            super(parentRepairSession, id, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames);
+         }
+ 
+         protected DebuggableThreadPoolExecutor createExecutor()
+         {
+             DebuggableThreadPoolExecutor executor = super.createExecutor();
+             executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 -            return executor;
 -        }
++            return executor;        }
+     }
 -
+     @BeforeClass
+     public static void setupClass() throws UnknownHostException
+     {
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE, CF));
 -        addr1 = InetAddress.getByName("127.0.0.1");
 -        addr2 = InetAddress.getByName("127.0.0.2");
 -        addr3 = InetAddress.getByName("127.0.0.3");
 -        addr4 = InetAddress.getByName("127.0.0.4");
++        addr1 = InetAddressAndPort.getByName("127.0.0.1");
++        addr2 = InetAddressAndPort.getByName("127.0.0.2");
++        addr3 = InetAddressAndPort.getByName("127.0.0.3");
++        addr4 = InetAddressAndPort.getByName("127.0.0.4");
++        addr5 = InetAddressAndPort.getByName("127.0.0.5");
+     }
  
-     static InetAddressAndPort addr1;
-     static InetAddressAndPort addr2;
-     static InetAddressAndPort addr3;
-     static InetAddressAndPort addr4;
-     static InetAddressAndPort addr5;
+     @Before
+     public void setup()
+     {
 -        Set<InetAddress> neighbors = new HashSet<>(Arrays.asList(addr2, addr3));
++        Set<InetAddressAndPort> neighbors = new HashSet<>(Arrays.asList(addr2, addr3));
  
-     static Range<Token> range1 = range(0, 1);
-     static Range<Token> range2 = range(2, 3);
-     static Range<Token> range3 = range(4, 5);
-     static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+         UUID parentRepairSession = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(),
++        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
+                                                                  Collections.singletonList(Keyspace.open(KEYSPACE).getColumnFamilyStore(CF)), fullRange, false,
 -                                                                 ActiveRepairService.UNREPAIRED_SSTABLE, false);
++                                                                 ActiveRepairService.UNREPAIRED_SSTABLE, false, PreviewKind.NONE);
+ 
 -        this.session = new MeasureableRepairSession(parentRepairSession, UUIDGen.getTimeUUID(), fullRange,
 -                                                    KEYSPACE, RepairParallelism.SEQUENTIAL, neighbors,
 -                                                    ActiveRepairService.UNREPAIRED_SSTABLE, false, CF);
++        this.session = new MeasureableRepairSession(parentRepairSession, UUIDGen.getTimeUUID(),
++                                                    new CommonRange(neighbors, Collections.emptySet(), fullRange),
++                                                    KEYSPACE, RepairParallelism.SEQUENTIAL,
++                                                    false, false, false,
++                                                    PreviewKind.NONE, false, CF);
+ 
+         this.job = new RepairJob(session, CF);
+         this.sessionJobDesc = new RepairJobDesc(session.parentRepairSession, session.getId(),
 -                                                session.keyspace, CF, session.getRanges());
++                                                session.keyspace, CF, session.ranges());
+ 
 -        DatabaseDescriptor.setBroadcastAddress(addr1);
++        FBUtilities.setBroadcastInetAddress(addr1.address);
+     }
  
      @After
      public void reset()
      {
+         ActiveRepairService.instance.terminateSessions();
+         MessagingService.instance().clearMessageSinks();
 +        FBUtilities.reset();
-         DatabaseDescriptor.setBroadcastAddress(addr1.address);
      }
  
-     static
+     /**
 -     * Ensure we can do an end to end repair of consistent data and get the messages we expect
++     * Ensure RepairJob issues the right messages in an end to end repair of consistent data
+      */
+     @Test
 -    public void testEndToEndNoDifferences() throws Exception
++    public void testEndToEndNoDifferences() throws InterruptedException, ExecutionException, TimeoutException
      {
-         try
-         {
-             addr1 = InetAddressAndPort.getByName("127.0.0.1");
-             addr2 = InetAddressAndPort.getByName("127.0.0.2");
-             addr3 = InetAddressAndPort.getByName("127.0.0.3");
-             addr4 = InetAddressAndPort.getByName("127.0.0.4");
-             addr5 = InetAddressAndPort.getByName("127.0.0.5");
-             DatabaseDescriptor.setBroadcastAddress(addr1.address);
-         }
-         catch (UnknownHostException e)
 -        Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
 -        mockTrees.put(FBUtilities.getBroadcastAddress(), createInitialTree(false));
++        Map<InetAddressAndPort, MerkleTrees> mockTrees = new HashMap<>();
++        mockTrees.put(FBUtilities.getBroadcastAddressAndPort(), createInitialTree(false));
+         mockTrees.put(addr2, createInitialTree(false));
+         mockTrees.put(addr3, createInitialTree(false));
+ 
+         List<MessageOut> observedMessages = new ArrayList<>();
+         interceptRepairMessages(mockTrees, observedMessages);
+ 
+         job.run();
+ 
+         RepairResult result = job.get(TEST_TIMEOUT_S, TimeUnit.SECONDS);
+ 
 -        assertEquals(3, result.stats.size());
 -        // Should be one RemoteSyncTask left behind (other two should be local)
 -        assertExpectedDifferences(session.getSyncingTasks().values(), 0);
++        // Since there are no differences, there should be nothing to sync.
++        assertEquals(0, result.stats.size());
+ 
+         // RepairJob should send out SNAPSHOTS -> VALIDATIONS -> done
+         List<RepairMessage.Type> expectedTypes = new ArrayList<>();
+         for (int i = 0; i < 3; i++)
+             expectedTypes.add(RepairMessage.Type.SNAPSHOT);
+         for (int i = 0; i < 3; i++)
+             expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST);
+ 
+         assertEquals(expectedTypes, observedMessages.stream()
+                                                     .map(k -> ((RepairMessage) k.payload).messageType)
+                                                     .collect(Collectors.toList()));
+     }
+ 
+     /**
+      * Regression test for CASSANDRA-14096. We should not retain memory in the RepairSession once the
+      * ValidationTask -> SyncTask transform is done.
+      */
+     @Test
+     public void testNoTreesRetainedAfterDifference() throws Throwable
+     {
 -        Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
 -        mockTrees.put(FBUtilities.getBroadcastAddress(), createInitialTree(false));
 -        mockTrees.put(addr2, createInitialTree(true));
++        Map<InetAddressAndPort, MerkleTrees> mockTrees = new HashMap<>();
++        mockTrees.put(addr1, createInitialTree(true));
++        mockTrees.put(addr2, createInitialTree(false));
+         mockTrees.put(addr3, createInitialTree(false));
+ 
 -        List<MessageOut> observedMessages = new ArrayList<>();
 -        interceptRepairMessages(mockTrees, observedMessages);
 -
+         List<TreeResponse> mockTreeResponses = mockTrees.entrySet().stream()
+                                                         .map(e -> new TreeResponse(e.getKey(), e.getValue()))
+                                                         .collect(Collectors.toList());
++        List<MessageOut> messages = new ArrayList<>();
++        interceptRepairMessages(mockTrees, messages);
+ 
 -        long singleTreeSize = ObjectSizes.measureDeep(mockTrees.get(addr2));
++        long singleTreeSize = ObjectSizes.measureDeep(mockTrees.get(addr1));
+ 
 -        // Use a different local address so we get all RemoteSyncs (as LocalSyncs try to reach out over the network).
 -        List<SyncTask> syncTasks = job.createSyncTasks(mockTreeResponses, addr4);
++        // Use addr4 instead of one of the provided trees to force everything to be remote sync tasks as
++        // LocalSyncTasks try to reach over the network.
++        List<SyncTask> syncTasks = RepairJob.createStandardSyncTasks(sessionJobDesc, mockTreeResponses,
++                                                                     addr4, // local
++                                                                     noTransient(),
++                                                                     session.isIncremental,
++                                                                     session.pullRepair,
++                                                                     session.previewKind);
+ 
+         // SyncTasks themselves should not contain significant memory
 -        assertTrue(ObjectSizes.measureDeep(syncTasks) < 0.8 * singleTreeSize);
++        assertTrue(ObjectSizes.measureDeep(syncTasks) < 0.2 * singleTreeSize);
+ 
 -        ListenableFuture<List<SyncStat>> syncResults = Futures.transform(Futures.immediateFuture(mockTreeResponses), new AsyncFunction<List<TreeResponse>, List<SyncStat>>()
 -        {
 -            public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> treeResponses)
 -            {
 -                return Futures.allAsList(syncTasks);
 -            }
 -        }, session.taskExecutor);
++        ListenableFuture<List<SyncStat>> syncResults = job.executeTasks(syncTasks);
+ 
 -        // The session can retain memory in the contained executor until the threads expire, so we wait for the threads
++        // Immediately following execution the internal execution queue should still retain the trees
++        assertTrue(ObjectSizes.measureDeep(session) > singleTreeSize);
++
++        // The session retains memory in the contained executor until the threads expire, so we wait for the threads
+         // that ran the Tree -> SyncTask conversions to die and release the memory
+         int millisUntilFreed;
+         for (millisUntilFreed = 0; millisUntilFreed < TEST_TIMEOUT_S * 1000; millisUntilFreed += THREAD_TIMEOUT_MILLIS)
          {
-             e.printStackTrace();
+             // The measured size of the syncingTasks, and result of the computation should be much smaller
++            TimeUnit.MILLISECONDS.sleep(THREAD_TIMEOUT_MILLIS);
+             if (ObjectSizes.measureDeep(session) < 0.8 * singleTreeSize)
+                 break;
 -            TimeUnit.MILLISECONDS.sleep(THREAD_TIMEOUT_MILLIS);
          }
+ 
+         assertTrue(millisUntilFreed < TEST_TIMEOUT_S * 1000);
+ 
+         List<SyncStat> results = syncResults.get(TEST_TIMEOUT_S, TimeUnit.SECONDS);
+ 
 -        assertTrue(ObjectSizes.measureDeep(results) < 0.8 * singleTreeSize);
++        assertTrue(ObjectSizes.measureDeep(results) < 0.2 * singleTreeSize);
++
++        assertEquals(2, results.size());
++        assertEquals(0, session.getSyncingTasks().size());
++        assertTrue(results.stream().allMatch(s -> s.numberOfDifferences == 1));
++
++        assertEquals(2, messages.size());
++        assertTrue(messages.stream().allMatch(m -> ((RepairMessage) m.payload).messageType == RepairMessage.Type.SYNC_REQUEST));
 +    }
 +
 +    @Test
 +    public void testCreateStandardSyncTasks()
 +    {
 +        testCreateStandardSyncTasks(false);
 +    }
 +
 +    @Test
 +    public void testCreateStandardSyncTasksPullRepair()
 +    {
 +        testCreateStandardSyncTasks(true);
 +    }
 +
 +    public static void testCreateStandardSyncTasks(boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
 +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                                    treeResponses,
 +                                                                                    addr1, // local
 +                                                                                    noTransient(), // transient
 +                                                                                    false,
 +                                                                                    pullRepair,
 +                                                                                    PreviewKind.ALL));
 +
 +        Assert.assertEquals(2, tasks.size());
 +
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
 +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
 +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +
 +        task = tasks.get(pair(addr2, addr3));
 +        Assert.assertFalse(task.isLocal());
 +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
 +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +
 +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
 +    }
 +
 +    @Test
 +    public void testStandardSyncTransient()
 +    {
 +        // Do not stream towards transient nodes
 +        testStandardSyncTransient(true);
 +        testStandardSyncTransient(false);
 +    }
 +
 +    public void testStandardSyncTransient(boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                                    treeResponses,
 +                                                                                    addr1, // local
 +                                                                                    transientPredicate(addr2),
 +                                                                                    false,
 +                                                                                    pullRepair,
 +                                                                                    PreviewKind.ALL));
 +
 +        Assert.assertEquals(1, tasks.size());
 +
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
 +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
 +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +    }
 +
 +    @Test
 +    public void testStandardSyncLocalTransient()
 +    {
 +        // Do not stream towards transient nodes
 +        testStandardSyncLocalTransient(true);
 +        testStandardSyncLocalTransient(false);
 +    }
 +
 +    public void testStandardSyncLocalTransient(boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                                    treeResponses,
 +                                                                                    addr1, // local
 +                                                                                    transientPredicate(addr1),
 +                                                                                    false,
 +                                                                                    pullRepair,
 +                                                                                    PreviewKind.ALL));
 +
 +        if (pullRepair)
 +        {
 +            Assert.assertTrue(tasks.isEmpty());
 +            return;
 +        }
 +
 +        Assert.assertEquals(1, tasks.size());
 +
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
 +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
 +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +    }
 +
 +    @Test
 +    public void testEmptyDifference()
 +    {
 +        // one of the nodes is a local coordinator
 +        testEmptyDifference(addr1, noTransient(), true);
 +        testEmptyDifference(addr1, noTransient(), false);
 +        testEmptyDifference(addr2, noTransient(), true);
 +        testEmptyDifference(addr2, noTransient(), false);
 +        testEmptyDifference(addr1, transientPredicate(addr1), true);
 +        testEmptyDifference(addr2, transientPredicate(addr1), true);
 +        testEmptyDifference(addr1, transientPredicate(addr1), false);
 +        testEmptyDifference(addr2, transientPredicate(addr1), false);
 +        testEmptyDifference(addr1, transientPredicate(addr2), true);
 +        testEmptyDifference(addr2, transientPredicate(addr2), true);
 +        testEmptyDifference(addr1, transientPredicate(addr2), false);
 +        testEmptyDifference(addr2, transientPredicate(addr2), false);
 +
 +        // nonlocal coordinator
 +        testEmptyDifference(addr3, noTransient(), true);
 +        testEmptyDifference(addr3, noTransient(), false);
 +        testEmptyDifference(addr3, noTransient(), true);
 +        testEmptyDifference(addr3, noTransient(), false);
 +        testEmptyDifference(addr3, transientPredicate(addr1), true);
 +        testEmptyDifference(addr3, transientPredicate(addr1), true);
 +        testEmptyDifference(addr3, transientPredicate(addr1), false);
 +        testEmptyDifference(addr3, transientPredicate(addr1), false);
 +        testEmptyDifference(addr3, transientPredicate(addr2), true);
 +        testEmptyDifference(addr3, transientPredicate(addr2), true);
 +        testEmptyDifference(addr3, transientPredicate(addr2), false);
 +        testEmptyDifference(addr3, transientPredicate(addr2), false);
 +    }
 +
 +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                                    treeResponses,
 +                                                                                    local, // local
 +                                                                                    isTransient,
 +                                                                                    false,
 +                                                                                    pullRepair,
 +                                                                                    PreviewKind.ALL));
 +
 +        Assert.assertTrue(tasks.isEmpty());
 +    }
 +
 +    @Test
 +    public void testCreateStandardSyncTasksAllDifferent()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
 +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
 +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                                    treeResponses,
 +                                                                                    addr1, // local
 +                                                                                    ep -> ep.equals(addr3), // transient
 +                                                                                    false,
 +                                                                                    true,
 +                                                                                    PreviewKind.ALL));
 +
 +        Assert.assertEquals(3, tasks.size());
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
 +
 +        task = tasks.get(pair(addr2, addr3));
 +        Assert.assertFalse(task.isLocal());
 +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
 +
 +        task = tasks.get(pair(addr1, addr3));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
 +    }
 +
 +    @Test
 +    public void testCreate5NodeStandardSyncTasksWithTransient()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
 +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
 +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
 +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
 +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
 +
 +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                                    treeResponses,
 +                                                                                    addr1, // local
 +                                                                                    isTransient, // transient
 +                                                                                    false,
 +                                                                                    true,
 +                                                                                    PreviewKind.ALL));
 +
 +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
 +                                                   pair(addr1, addr3),
 +                                                   pair(addr1, addr4),
 +                                                   pair(addr1, addr5),
 +                                                   pair(addr2, addr4),
 +                                                   pair(addr2, addr4),
 +                                                   pair(addr2, addr5),
 +                                                   pair(addr3, addr4),
 +                                                   pair(addr3, addr5)};
 +
 +        for (SyncNodePair pair : pairs)
 +        {
 +            SyncTask task = tasks.get(pair);
 +            // Local only if addr1 is a coordinator
 +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
 +
 +            boolean isRemote = !pair.coordinator.equals(addr1) && !pair.peer.equals(addr1);
 +            boolean involvesTransient = isTransient.test(pair.coordinator) || isTransient.test(pair.peer);
 +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",pair.coordinator, pair.peer),
 +                         isRemote && involvesTransient,
 +                         task instanceof AsymmetricRemoteSyncTask);
 +
 +            // All ranges to be synchronised
 +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
 +        }
 +    }
  
 -        assertEquals(3, results.size());
 -        // Should be two RemoteSyncTasks with ranges and one empty one
 -        assertExpectedDifferences(new ArrayList<>(session.getSyncingTasks().values()), 1, 1, 0);
 +    @Test
 +    public void testLocalSyncWithTransient()
 +    {
-         try
-         {
-             for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
-             {
-                 FBUtilities.reset();
-                 DatabaseDescriptor.setBroadcastAddress(local.address);
-                 testLocalSyncWithTransient(local, false);
-             }
-         }
-         finally
++        for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
 +        {
 +            FBUtilities.reset();
-             DatabaseDescriptor.setBroadcastAddress(addr1.address);
++            FBUtilities.setBroadcastInetAddress(local.address);
++            testLocalSyncWithTransient(local, false);
 +        }
 +    }
  
 -        int numDifferent = 0;
 -        for (SyncStat stat : results)
 +    @Test
 +    public void testLocalSyncWithTransientPullRepair()
 +    {
-         try
-         {
-             for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
-             {
-                 FBUtilities.reset();
-                 DatabaseDescriptor.setBroadcastAddress(local.address);
-                 testLocalSyncWithTransient(local, true);
-             }
-         }
-         finally
++        for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
          {
 -            if (stat.nodes.endpoint1.equals(addr2) || stat.nodes.endpoint2.equals(addr2))
 -            {
 -                assertEquals(1, stat.numberOfDifferences);
 -                numDifferent++;
 -            }
 +            FBUtilities.reset();
-             DatabaseDescriptor.setBroadcastAddress(addr1.address);
++            FBUtilities.setBroadcastInetAddress(local.address);
++            testLocalSyncWithTransient(local, true);
 +        }
- 
 +    }
 +
 +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
 +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
 +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
 +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
 +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
 +
 +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                                    treeResponses,
 +                                                                                    local, // local
 +                                                                                    isTransient, // transient
 +                                                                                    false,
 +                                                                                    pullRepair,
 +                                                                                    PreviewKind.ALL));
 +
 +        assertEquals(9, tasks.size());
 +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
 +        {
 +            if (local.equals(addr))
 +                continue;
 +
 +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
 +            assertTrue(task.requestRanges);
 +            assertEquals(!pullRepair, task.transferRanges);
 +        }
 +
 +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
 +        assertTrue(task.requestRanges);
 +        assertFalse(task.transferRanges);
 +
 +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
 +        assertTrue(task.requestRanges);
 +        assertFalse(task.transferRanges);
 +    }
 +
 +    @Test
 +    public void testLocalAndRemoteTransient()
 +    {
 +        testLocalAndRemoteTransient(false);
 +    }
 +
 +    @Test
 +    public void testLocalAndRemoteTransientPullRepair()
 +    {
 +        testLocalAndRemoteTransient(true);
 +    }
 +
 +    private static void testLocalAndRemoteTransient(boolean pullRepair)
 +    {
-         DatabaseDescriptor.setBroadcastAddress(addr4.address);
++        FBUtilities.setBroadcastInetAddress(addr4.address);
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
 +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
 +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
 +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
 +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                                    treeResponses,
 +                                                                                    addr4, // local
 +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
 +                                                                                    false,
 +                                                                                    pullRepair,
 +                                                                                    PreviewKind.ALL));
 +
 +        assertNull(tasks.get(pair(addr4, addr5)));
 +    }
 +
 +    @Test
 +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
 +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
 +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
 +                                                                                            treeResponses,
 +                                                                                            addr1, // local
 +                                                                                            noTransient(),
 +                                                                                            addr -> "DC1",
 +                                                                                            false,
 +                                                                                            PreviewKind.ALL));
 +
 +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
 +                                                     pair(addr1, addr3),
 +                                                     pair(addr2, addr1),
 +                                                     pair(addr2, addr3),
 +                                                     pair(addr3, addr1),
 +                                                     pair(addr3, addr2) })
 +        {
 +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
 +        }
 +    }
 +
 +    @Test
 +    public void testOptimizedCreateStandardSyncTasks()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
 +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
 +                                                         treeResponse(addr3, range1, "three", range2, "two"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
 +                                                                                            treeResponses,
 +                                                                                            addr4, // local
 +                                                                                            noTransient(),
 +                                                                                            addr -> "DC1",
 +                                                                                            false,
 +                                                                                            PreviewKind.ALL));
 +
 +        for (SyncTask task : tasks.values())
 +            assertTrue(task instanceof AsymmetricRemoteSyncTask);
 +
 +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
 +        // addr1 can get range2 from either addr2 or addr3 but not from both
 +        assertStreamRangeFromEither(tasks, Arrays.asList(range2),
 +                                    addr1, addr2, addr3);
 +
 +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
 +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
 +
 +        // addr3 can get range1 from either addr1 or addr2 but not from both
 +        assertStreamRangeFromEither(tasks, Arrays.asList(range1),
 +                                    addr3, addr2, addr1);
 +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr3, addr1)).rangesToSync);
 +    }
 +
 +    @Test
 +    public void testOptimizedCreateStandardSyncTasksWithTransient()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
 +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
 +
 +        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
 +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
 +                                                                                            treeResponses,
 +                                                                                            addr1, // local
 +                                                                                            ep -> ep.equals(addr3),
 +                                                                                            addr -> "DC1",
 +                                                                                            false,
 +                                                                                            PreviewKind.ALL));
 +
 +        assertEquals(3, tasks.size());
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        assertTrue(task.isLocal());
 +        assertElementEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +        assertTrue(((LocalSyncTask)task).requestRanges);
 +        assertFalse(((LocalSyncTask)task).transferRanges);
 +
 +        assertStreamRangeFromEither(tasks, Arrays.asList(range3),
 +                                    addr2, addr1, addr3);
 +
-         task = tasks.get(pair(addr2, addr3));
-         assertFalse(task.isLocal());
-         assertElementEquals(Arrays.asList(range1), task.rangesToSync);
++        assertStreamRangeFromEither(tasks, Arrays.asList(range1),
++                                    addr2, addr1, addr3);
 +    }
 +
 +    // Asserts that ranges are streamed from one of the nodes but not from the both
 +    public static void assertStreamRangeFromEither(Map<SyncNodePair, SyncTask> tasks, List<Range<Token>> ranges,
 +                                                   InetAddressAndPort target, InetAddressAndPort either, InetAddressAndPort or)
 +    {
 +        InetAddressAndPort streamsFrom;
 +        InetAddressAndPort doesntStreamFrom;
-         if (tasks.containsKey(pair(target, either)))
++        if (tasks.containsKey(pair(target, either)) && tasks.get(pair(target, either)).rangesToSync.equals(ranges))
 +        {
 +            streamsFrom = either;
 +            doesntStreamFrom = or;
          }
 -        assertEquals(2, numDifferent);
 +        else
 +        {
 +            doesntStreamFrom = either;
 +            streamsFrom = or;
 +        }
 +
 +        SyncTask task = tasks.get(pair(target, streamsFrom));
 +        assertTrue(task instanceof AsymmetricRemoteSyncTask);
 +        assertElementEquals(ranges, task.rangesToSync);
 +        assertDoesntStreamRangeFrom(tasks, ranges, target, doesntStreamFrom);
 +    }
 +
 +    public static void assertDoesntStreamRangeFrom(Map<SyncNodePair, SyncTask> tasks, List<Range<Token>> ranges,
 +                                                   InetAddressAndPort target, InetAddressAndPort source)
 +    {
 +        Set<Range<Token>> rangeSet = new HashSet<>(ranges);
 +        SyncTask task = tasks.get(pair(target, source));
 +        if (task == null)
 +            return; // Doesn't stream anything
 +
 +        for (Range<Token> range : task.rangesToSync)
 +        {
 +            assertFalse(String.format("%s shouldn't stream %s from %s",
 +                                      target, range, source),
 +                        rangeSet.contains(range));
 +        }
 +    }
 +
 +    public static <T> void assertElementEquals(Collection<T> col1, Collection<T> col2)
 +    {
 +        Set<T> set1 = new HashSet<>(col1);
 +        Set<T> set2 = new HashSet<>(col2);
 +        Set<T> difference = Sets.difference(set1, set2);
 +        assertTrue("Expected empty difference but got: " + difference.toString(),
 +                   difference.isEmpty());
 +    }
 +
 +    public static Token tk(int i)
 +    {
 +        return PARTITIONER.getToken(ByteBufferUtil.bytes(i));
 +    }
 +
 +    public static Range<Token> range(int from, int to)
 +    {
 +        return new Range<>(tk(from), tk(to));
 +    }
 +
 +    public static TreeResponse treeResponse(InetAddressAndPort addr, Object... rangesAndHashes)
 +    {
 +        MerkleTrees trees = new MerkleTrees(PARTITIONER);
 +        for (int i = 0; i < rangesAndHashes.length; i += 2)
 +        {
 +            Range<Token> range = (Range<Token>) rangesAndHashes[i];
 +            String hash = (String) rangesAndHashes[i + 1];
 +            MerkleTree tree = trees.addMerkleTree(2, MerkleTree.RECOMMENDED_DEPTH, range);
 +            tree.get(range.left).hash(hash.getBytes());
 +        }
 +
 +        return new TreeResponse(addr, trees);
 +    }
 +
 +    public static SyncNodePair pair(InetAddressAndPort node1, InetAddressAndPort node2)
 +    {
 +        return new SyncNodePair(node1, node2);
 +    }
 +
 +    public static Map<SyncNodePair, SyncTask> toMap(List<SyncTask> tasks)
 +    {
 +        Map<SyncNodePair, SyncTask> map = new HashMap();
 +        for (SyncTask task : tasks)
 +        {
 +            SyncTask oldTask = map.put(task.nodePair, task);
 +            Assert.assertNull(String.format("\nNode pair: %s\nOld task:  %s\nNew task:  %s\n",
 +                                            task.nodePair,
 +                                            oldTask,
 +                                            task),
 +                              oldTask);
 +        }
 +        return map;
 +    }
 +
 +    public static Predicate<InetAddressAndPort> transientPredicate(InetAddressAndPort... transientNodes)
 +    {
 +        Set<InetAddressAndPort> set = new HashSet<>();
 +        for (InetAddressAndPort node : transientNodes)
 +            set.add(node);
 +
 +        return set::contains;
      }
  
 -    private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, Integer ... differences)
 +    public static Predicate<InetAddressAndPort> noTransient()
      {
 -        List<Integer> expectedDifferences = new ArrayList<>(Arrays.asList(differences));
 -        List<Integer> observedDifferences = tasks.stream()
 -                                                 .map(t -> (int) t.getCurrentStat().numberOfDifferences)
 -                                                 .collect(Collectors.toList());
 -        assertEquals(expectedDifferences.size(), observedDifferences.size());
 -        assertTrue(expectedDifferences.containsAll(observedDifferences));
 +        return node -> false;
      }
+ 
+     private MerkleTrees createInitialTree(boolean invalidate)
+     {
+         MerkleTrees tree = new MerkleTrees(MURMUR3_PARTITIONER);
+         tree.addMerkleTrees((int) Math.pow(2, 15), fullRange);
+         tree.init();
+         for (MerkleTree.TreeRange r : tree.invalids())
+         {
+             r.ensureHashInitialised();
+         }
+ 
+         if (invalidate)
+         {
+             // change a range in one of the trees
+             Token token = MURMUR3_PARTITIONER.midpoint(fullRange.get(0).left, fullRange.get(0).right);
+             tree.invalidate(token);
+             tree.get(token).hash("non-empty hash!".getBytes());
+         }
+ 
+         return tree;
+     }
+ 
 -    private void interceptRepairMessages(Map<InetAddress, MerkleTrees> mockTrees,
++    private void interceptRepairMessages(Map<InetAddressAndPort, MerkleTrees> mockTrees,
+                                          List<MessageOut> messageCapture)
+     {
+         MessagingService.instance().addMessageSink(new IMessageSink()
+         {
 -            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
++            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
+             {
+                 if (message == null || !(message.payload instanceof RepairMessage))
+                     return false;
+ 
+                 // So different Thread's messages don't overwrite each other.
+                 synchronized (messageLock)
+                 {
+                     messageCapture.add(message);
+                 }
+ 
+                 RepairMessage rm = (RepairMessage) message.payload;
+                 switch (rm.messageType)
+                 {
+                     case SNAPSHOT:
+                         MessageIn<?> messageIn = MessageIn.create(to, null,
+                                                                   Collections.emptyMap(),
+                                                                   MessagingService.Verb.REQUEST_RESPONSE,
+                                                                   MessagingService.current_version);
+                         MessagingService.instance().receive(messageIn, id);
+                         break;
+                     case VALIDATION_REQUEST:
+                         session.validationComplete(sessionJobDesc, to, mockTrees.get(to));
+                         break;
+                     case SYNC_REQUEST:
+                         SyncRequest syncRequest = (SyncRequest) rm;
 -                        session.syncComplete(sessionJobDesc, new NodePair(syncRequest.src, syncRequest.dst), true);
++                        session.syncComplete(sessionJobDesc, new SyncNodePair(syncRequest.src, syncRequest.dst),
++                                             true, Collections.emptyList());
+                         break;
+                     default:
+                         break;
+                 }
+                 return false;
+             }
+ 
+             public boolean allowIncomingMessage(MessageIn message, int id)
+             {
+                 return message.verb == MessagingService.Verb.REQUEST_RESPONSE;
+             }
+         });
+     }
  }
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 3b582a9,9c32cef..ff6b11c
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -17,22 -17,22 +17,24 @@@
   */
  package org.apache.cassandra.repair;
  
--import java.net.InetAddress;
 +import java.nio.ByteBuffer;
++import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collections;
  import java.util.Iterator;
++import java.util.List;
  import java.util.Map;
  import java.util.UUID;
  import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.TimeUnit;
  
 -import com.google.common.util.concurrent.ListenableFuture;
 -import com.google.common.util.concurrent.SettableFuture;
 +import com.google.common.hash.Hasher;
  
--import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.compaction.CompactionsTest;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.junit.After;
++import org.junit.Before;
  import org.junit.BeforeClass;
  import org.junit.Test;
  
@@@ -70,6 -68,6 +72,7 @@@ import static org.junit.Assert.assertTr
  public class ValidatorTest
  {
      private static final long TEST_TIMEOUT = 60; //seconds
++    private static int testSizeMegabytes;
  
      private static final String keyspace = "ValidatorTest";
      private static final String columnFamily = "Standard1";
@@@ -82,13 -80,13 +85,21 @@@
          SchemaLoader.createKeyspace(keyspace,
                                      KeyspaceParams.simple(1),
                                      SchemaLoader.standardCFMD(keyspace, columnFamily));
 -        partitioner = Schema.instance.getCFMetaData(keyspace, columnFamily).partitioner;
 +        partitioner = Schema.instance.getTableMetadata(keyspace, columnFamily).partitioner;
++        testSizeMegabytes = DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
      }
  
      @After
      public void tearDown()
      {
          MessagingService.instance().clearMessageSinks();
++        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(testSizeMegabytes);
++    }
++
++    @Before
++    public void setup()
++    {
++        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(testSizeMegabytes);
      }
  
      @Test
@@@ -217,45 -215,6 +228,188 @@@
          assertEquals(trees.rowCount(), n);
      }
  
++    /*
++     * Test for CASSANDRA-14096 size limiting. We:
++     * 1. Limit the size of a repair session
++     * 2. Submit a validation
++     * 3. Check that the resulting tree is of limited depth
++     */
++    @Test
++    public void testSizeLimiting() throws Exception
++    {
++        Keyspace ks = Keyspace.open(keyspace);
++        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
++        cfs.clearUnsafe();
++
++        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(1);
++
++        // disable compaction while flushing
++        cfs.disableAutoCompaction();
++
++        // 2 ** 14 rows would normally use 2^14 leaves, but with only 1 meg we should only use 2^12
++        CompactionsTest.populate(keyspace, columnFamily, 0, 1 << 14, 0);
++
++        cfs.forceBlockingFlush();
++        assertEquals(1, cfs.getLiveSSTables().size());
++
++        // wait enough to force single compaction
++        TimeUnit.SECONDS.sleep(5);
++
++        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
++        UUID repairSessionId = UUIDGen.getTimeUUID();
++        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
++                                                     cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
++                                                                                                               sstable.last.getToken())));
++
++        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddressAndPort(),
++                                                                 Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                 false, PreviewKind.NONE);
++
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
++        ValidationManager.instance.submitValidation(cfs, validator);
++
++        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
++        MerkleTrees trees = ((ValidationComplete) message.payload).trees;
++
++        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
++        int numTrees = 0;
++        while (iterator.hasNext())
++        {
++            assertEquals(1 << 12, iterator.next().getValue().size(), 0.0);
++            numTrees++;
++        }
++        assertEquals(1, numTrees);
++
++        assertEquals(trees.rowCount(), 1 << 14);
++    }
++
++    /*
++     * Test for CASSANDRA-11390. When there are multiple subranges the trees should
++     * automatically size down to make each subrange fit in the provided memory
++     * 1. Limit the size of all the trees
++     * 2. Submit a validation against more than one range
++     * 3. Check that we have the right number and sizes of trees
++     */
++    @Test
++    public void testRangeSplittingTreeSizeLimit() throws Exception
++    {
++        Keyspace ks = Keyspace.open(keyspace);
++        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
++        cfs.clearUnsafe();
++
++        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(1);
++
++        // disable compaction while flushing
++        cfs.disableAutoCompaction();
++
++        // 2 ** 14 rows would normally use 2^14 leaves, but with only 1 meg we should only use 2^12
++        CompactionsTest.populate(keyspace, columnFamily, 0, 1 << 14, 0);
++
++        cfs.forceBlockingFlush();
++        assertEquals(1, cfs.getLiveSSTables().size());
++
++        // wait enough to force single compaction
++        TimeUnit.SECONDS.sleep(5);
++
++        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
++        UUID repairSessionId = UUIDGen.getTimeUUID();
++
++        List<Range<Token>> ranges = splitHelper(new Range<>(sstable.first.getToken(), sstable.last.getToken()), 2);
++
++
++        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
++                                                     cfs.getTableName(), ranges);
++
++        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddressAndPort(),
++                                                                 Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                 false, PreviewKind.NONE);
++
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
++        ValidationManager.instance.submitValidation(cfs, validator);
++
++        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
++        MerkleTrees trees = ((ValidationComplete) message.payload).trees;
++
++        // Should have 4 trees each with a depth of on average 10 (since each range should have gotten 0.25 megabytes)
++        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
++        int numTrees = 0;
++        double totalResolution = 0;
++        while (iterator.hasNext())
++        {
++            long size = iterator.next().getValue().size();
++            // So it turns out that sstable range estimates are pretty variable, depending on the sampling we can
++            // get a wide range of values here. So we just make sure that we're smaller than in the single range
++            // case and have the right total size.
++            assertTrue(size <= (1 << 11));
++            assertTrue(size >= (1 << 9));
++            totalResolution += size;
++            numTrees += 1;
++        }
++
++        assertEquals(trees.rowCount(), 1 << 14);
++        assertEquals(4, numTrees);
++
++        // With a single tree and a megabyte we should had a total resolution of 2^12 leaves; with multiple
++        // ranges we should get similar overall resolution, but not more.
++        assertTrue(totalResolution > (1 << 11) && totalResolution < (1 << 13));
++    }
++
++    private List<Range<Token>> splitHelper(Range<Token> range, int depth)
++    {
++        if (depth <= 0)
++        {
++            List<Range<Token>> tokens = new ArrayList<>();
++            tokens.add(range);
++            return tokens;
++        }
++        Token midpoint = partitioner.midpoint(range.left, range.right);
++        List<Range<Token>> left = splitHelper(new Range<>(range.left, midpoint), depth - 1);
++        List<Range<Token>> right = splitHelper(new Range<>(midpoint, range.right), depth - 1);
++        left.addAll(right);
++        return left;
++    }
++
 +    @Test
 +    public void testCountingHasher()
 +    {
 +        Hasher [] hashers = new Hasher[] {new Validator.CountingHasher(), Validator.CountingHasher.hashFunctions[0].newHasher(), Validator.CountingHasher.hashFunctions[1].newHasher() };
 +        byte [] random = UUIDGen.getTimeUUIDBytes();
 +
 +        // call all overloaded methods:
 +        for (Hasher hasher : hashers)
 +        {
 +            hasher.putByte((byte) 33)
 +                  .putBytes(random)
 +                  .putBytes(ByteBuffer.wrap(random))
 +                  .putBytes(random, 0, 3)
 +                  .putChar('a')
 +                  .putBoolean(false)
 +                  .putDouble(3.3)
 +                  .putInt(77)
 +                  .putFloat(99)
 +                  .putLong(101)
 +                  .putShort((short) 23);
 +        }
 +
 +        long len = Byte.BYTES
 +                   + random.length * 2 // both the byte[] and the ByteBuffer
 +                   + 3 // 3 bytes from the random byte[]
 +                   + Character.BYTES
 +                   + Byte.BYTES
 +                   + Double.BYTES
 +                   + Integer.BYTES
 +                   + Float.BYTES
 +                   + Long.BYTES
 +                   + Short.BYTES;
 +
 +        byte [] h = hashers[0].hash().asBytes();
 +        assertTrue(Arrays.equals(hashers[1].hash().asBytes(), Arrays.copyOfRange(h, 0, 16)));
 +        assertTrue(Arrays.equals(hashers[2].hash().asBytes(), Arrays.copyOfRange(h, 16, 32)));
 +        assertEquals(len, ((Validator.CountingHasher)hashers[0]).getCount());
 +    }
 +
      private CompletableFuture<MessageOut> registerOutgoingMessageSink()
      {
          final CompletableFuture<MessageOut> future = new CompletableFuture<>();
diff --cc test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index 64aea24,64aea24..c213271
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@@ -19,14 -19,14 +19,18 @@@
  package org.apache.cassandra.utils;
  
  import java.math.BigInteger;
++import java.nio.ByteBuffer;
  import java.util.*;
  
  import com.google.common.collect.Lists;
  
++import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Test;
  import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.dht.ByteOrderedPartitioner;
  import org.apache.cassandra.dht.IPartitioner;
++import org.apache.cassandra.dht.Murmur3Partitioner;
  import org.apache.cassandra.dht.RandomPartitioner;
  import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
  import org.apache.cassandra.dht.Range;
@@@ -62,7 -62,7 +66,7 @@@ public class MerkleTreeTes
      }
  
      @Before
--    public void clear()
++    public void setup()
      {
          TOKEN_SCALE = new BigInteger("8");
          partitioner = RandomPartitioner.instance;
@@@ -92,7 -92,7 +96,7 @@@
      {
          if (i == -1)
              return new BigIntegerToken(new BigInteger("-1"));
--        BigInteger bint = RandomPartitioner.MAXIMUM.divide(TOKEN_SCALE).multiply(new BigInteger(""+i));
++        BigInteger bint = RandomPartitioner.MAXIMUM.divide(TOKEN_SCALE).multiply(new BigInteger("" + i));
          return new BigIntegerToken(bint);
      }
  
@@@ -113,10 -113,10 +117,10 @@@
          assertEquals(new Range<>(tok(6), tok(7)), mt.get(tok(7)));
  
          // check depths
--        assertEquals((byte)1, mt.get(tok(4)).depth);
--        assertEquals((byte)2, mt.get(tok(6)).depth);
--        assertEquals((byte)3, mt.get(tok(7)).depth);
--        assertEquals((byte)3, mt.get(tok(-1)).depth);
++        assertEquals((byte) 1, mt.get(tok(4)).depth);
++        assertEquals((byte) 2, mt.get(tok(6)).depth);
++        assertEquals((byte) 3, mt.get(tok(7)).depth);
++        assertEquals((byte) 3, mt.get(tok(-1)).depth);
  
          try
          {
@@@ -132,7 -132,7 +136,7 @@@
      @Test
      public void testSplitLimitDepth()
      {
--        mt = new MerkleTree(partitioner, fullRange(), (byte)2, Integer.MAX_VALUE);
++        mt = new MerkleTree(partitioner, fullRange(), (byte) 2, Integer.MAX_VALUE);
  
          assertTrue(mt.split(tok(4)));
          assertTrue(mt.split(tok(2)));
@@@ -472,7 -472,7 +476,7 @@@
  
          List<TreeRange> diffs = MerkleTree.difference(ltree, rtree);
          assertEquals(Lists.newArrayList(range), diffs);
--        assertEquals(MerkleTree.FULLY_INCONSISTENT, MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new MerkleTree.TreeDifference(ltree.fullRange.left, ltree.fullRange.right, (byte)0)));
++        assertEquals(MerkleTree.FULLY_INCONSISTENT, MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new MerkleTree.TreeDifference(ltree.fullRange.left, ltree.fullRange.right, (byte) 0)));
      }
  
      /**
@@@ -530,7 -530,7 +534,7 @@@
              {
                  // consume the stack
                  hash = Hashable.binaryHash(hstack.pop(), hash);
--                depth = dstack.pop()-1;
++                depth = dstack.pop() - 1;
              }
              dstack.push(depth);
              hstack.push(hash);
@@@ -563,4 -563,4 +567,80 @@@
              return endOfData();
          }
      }
++
++    @Test
++    public void testEstimatedSizes()
++    {
++        // With no or negative allowed space we should still get a depth of 1
++        Assert.assertEquals(1, MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance, -20, 32));
++        Assert.assertEquals(1, MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance, 0, 32));
++        Assert.assertEquals(1, MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance, 1, 32));
++
++        // The minimum of 1 megabyte split between RF=3 should yield trees of around 10
++        Assert.assertEquals(10, MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance,
++                                                                     1048576 / 3, 32));
++
++        // With a single megabyte of space we should get 12
++        Assert.assertEquals(12, MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance,
++                                                                     1048576, 32));
++
++        // With 100 megabytes we should get a limit of 19
++        Assert.assertEquals(19, MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance,
++                                                                     100 * 1048576, 32));
++
++        // With 300 megabytes we should get the old limit of 20
++        Assert.assertEquals(20, MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance,
++                                                                     300 * 1048576, 32));
++        Assert.assertEquals(20, MerkleTree.estimatedMaxDepthForBytes(RandomPartitioner.instance,
++                                                                     300 * 1048576, 32));
++        Assert.assertEquals(20, MerkleTree.estimatedMaxDepthForBytes(ByteOrderedPartitioner.instance,
++                                                                     300 * 1048576, 32));
++    }
++
++    @Test
++    public void testEstimatedSizesRealMeasurement()
++    {
++        // Use a fixed source of randomness so that the test does not flake.
++        Random random = new Random(1);
++        checkEstimatedSizes(RandomPartitioner.instance, random);
++        checkEstimatedSizes(Murmur3Partitioner.instance, random);
++    }
++
++    private void checkEstimatedSizes(IPartitioner partitioner, Random random)
++    {
++        Range<Token> fullRange = new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken());
++        MerkleTree tree = new MerkleTree(partitioner, fullRange, RECOMMENDED_DEPTH, 0);
++
++        // Test 16 kilobyte -> 16 megabytes
++        for (int i = 14; i < 24; i ++)
++        {
++            long numBytes = 1 << i;
++            int maxDepth = MerkleTree.estimatedMaxDepthForBytes(partitioner, numBytes, 32);
++            long realSizeOfMerkleTree = measureTree(tree, fullRange, maxDepth, random);
++            long biggerTreeSize = measureTree(tree, fullRange, maxDepth + 1, random);
++
++            Assert.assertTrue(realSizeOfMerkleTree < numBytes);
++            Assert.assertTrue(biggerTreeSize > numBytes);
++        }
++    }
++
++    private long measureTree(MerkleTree tree, Range<Token> fullRange, int depth, Random random)
++    {
++        tree = new MerkleTree(tree.partitioner(), fullRange, RECOMMENDED_DEPTH, (long) Math.pow(2, depth));
++        // Initializes it as a fully balanced tree.
++        tree.init();
++
++        byte[] key = new byte[128];
++        // Try to actually allocate some hashes. Note that this is not guaranteed to actually populate the tree,
++        // but we re-use the source of randomness to try to make it reproducible.
++        for (int i = 0; i < tree.maxsize() * 8; i++)
++        {
++            random.nextBytes(key);
++            Token token = tree.partitioner().getToken(ByteBuffer.wrap(key));
++            tree.get(token).addHash(new RowHash(token, new byte[32], 32));
++        }
++
++        tree.hash(fullRange);
++        return ObjectSizes.measureDeep(tree);
++    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org