You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2019/08/02 13:18:41 UTC

[cassandra] branch 15202-4.0 created (now 2669127)

This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a change to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


      at 2669127  Marcus review feedback #2

This branch includes the following new commits:

     new 51a29f2  HIGHRES config.yml
     new be64c1c  Make repair coordination less expensive by moving MerkleTrees off heap
     new 7326132  Address Benedict's review feedback
     new 47864d0  Address Marcus's review feedback
     new 3f5bb3b  Some MT.difference() optimisations / more Marcus's feedback
     new 9f56a14  Consistent naming for offheap MT toggle param and methods
     new 575c5fa  Test difference()
     new f89ce38  nit: Statically import MT.Difference
     new 2669127  Marcus review feedback #2

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 08/09: nit: Statically import MT.Difference

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f89ce38ed8d3efec0a38b0f6c557bb6b8b152a79
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Thu Aug 1 17:49:39 2019 +0100

    nit: Statically import MT.Difference
---
 .../org/apache/cassandra/utils/MerkleTree.java     | 33 +++++++++++-----------
 1 file changed, 17 insertions(+), 16 deletions(-)

diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4c385ab..288a5bb 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.utils.memory.MemoryUtil;
 import static java.lang.String.format;
 import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static org.apache.cassandra.utils.ByteBufferUtil.compare;
+import static org.apache.cassandra.utils.MerkleTree.Difference.*;
 
 /**
  * A MerkleTree implemented as a binary tree.
@@ -221,7 +222,7 @@ public class MerkleTree
             else
             {
                 logger.debug("Digest mismatch detected, traversing trees [{}, {}]", ltree, rtree);
-                if (Difference.FULLY_INCONSISTENT == differenceHelper(ltree, rtree, diff, active))
+                if (FULLY_INCONSISTENT == differenceHelper(ltree, rtree, diff, active))
                 {
                     logger.debug("Range {} fully inconsistent", active);
                     diff.add(active);
@@ -244,7 +245,7 @@ public class MerkleTree
     static Difference differenceHelper(MerkleTree ltree, MerkleTree rtree, List<TreeRange> diff, TreeRange active)
     {
         if (active.depth == Byte.MAX_VALUE)
-            return Difference.CONSISTENT;
+            return CONSISTENT;
 
         Token midpoint = ltree.partitioner().midpoint(active.left, active.right);
         // sanity check for midpoint calculation, see CASSANDRA-13052
@@ -253,7 +254,7 @@ public class MerkleTree
             // If the midpoint equals either the left or the right, we have a range that's too small to split - we'll simply report the
             // whole range as inconsistent
             logger.debug("({}) No sane midpoint ({}) for range {} , marking whole range as inconsistent", active.depth, midpoint, active);
-            return Difference.FULLY_INCONSISTENT;
+            return FULLY_INCONSISTENT;
         }
 
         TreeRange left = new TreeRange(active.left, midpoint, active.depth + 1);
@@ -265,62 +266,62 @@ public class MerkleTree
         lnode = ltree.find(left);
         rnode = rtree.find(left);
 
-        Difference ldiff = Difference.CONSISTENT;
+        Difference ldiff = CONSISTENT;
         if (null != lnode && null != rnode && lnode.hashesDiffer(rnode))
         {
             logger.debug("({}) Inconsistent digest on left sub-range {}: [{}, {}]", active.depth, left, lnode, rnode);
 
             if (lnode instanceof Leaf)
-                ldiff = Difference.FULLY_INCONSISTENT;
+                ldiff = FULLY_INCONSISTENT;
             else
                 ldiff = differenceHelper(ltree, rtree, diff, left);
         }
         else if (null == lnode || null == rnode)
         {
             logger.debug("({}) Left sub-range fully inconsistent {}", active.depth, left);
-            ldiff = Difference.FULLY_INCONSISTENT;
+            ldiff = FULLY_INCONSISTENT;
         }
 
         // see if we should recurse right
         lnode = ltree.find(right);
         rnode = rtree.find(right);
 
-        Difference rdiff = Difference.CONSISTENT;
+        Difference rdiff = CONSISTENT;
         if (null != lnode && null != rnode && lnode.hashesDiffer(rnode))
         {
             logger.debug("({}) Inconsistent digest on right sub-range {}: [{}, {}]", active.depth, right, lnode, rnode);
 
             if (rnode instanceof Leaf)
-                rdiff = Difference.FULLY_INCONSISTENT;
+                rdiff = FULLY_INCONSISTENT;
             else
                 rdiff = differenceHelper(ltree, rtree, diff, right);
         }
         else if (null == lnode || null == rnode)
         {
             logger.debug("({}) Right sub-range fully inconsistent {}", active.depth, right);
-            rdiff = Difference.FULLY_INCONSISTENT;
+            rdiff = FULLY_INCONSISTENT;
         }
 
-        if (ldiff == Difference.FULLY_INCONSISTENT && rdiff == Difference.FULLY_INCONSISTENT)
+        if (ldiff == FULLY_INCONSISTENT && rdiff == FULLY_INCONSISTENT)
         {
             // both children are fully inconsistent
             logger.debug("({}) Fully inconsistent range [{}, {}]", active.depth, left, right);
-            return Difference.FULLY_INCONSISTENT;
+            return FULLY_INCONSISTENT;
         }
-        else if (ldiff == Difference.FULLY_INCONSISTENT)
+        else if (ldiff == FULLY_INCONSISTENT)
         {
             logger.debug("({}) Adding left sub-range to diff as fully inconsistent {}", active.depth, left);
             diff.add(left);
-            return Difference.PARTIALLY_INCONSISTENT;
+            return PARTIALLY_INCONSISTENT;
         }
-        else if (rdiff == Difference.FULLY_INCONSISTENT)
+        else if (rdiff == FULLY_INCONSISTENT)
         {
             logger.debug("({}) Adding right sub-range to diff as fully inconsistent {}", active.depth, right);
             diff.add(right);
-            return Difference.PARTIALLY_INCONSISTENT;
+            return PARTIALLY_INCONSISTENT;
         }
         logger.debug("({}) Range {} partially inconstent", active.depth, active);
-        return Difference.PARTIALLY_INCONSISTENT;
+        return PARTIALLY_INCONSISTENT;
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 04/09: Address Marcus's review feedback

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 47864d09706927801ac1150199979cf3b3bc1c1e
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Fri Jul 26 16:52:40 2019 +0100

    Address Marcus's review feedback
---
 src/java/org/apache/cassandra/utils/MerkleTree.java | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 2052389..bdeaa3f 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -262,7 +262,7 @@ public class MerkleTree
         rnode = rtree.find(left);
 
         Difference ldiff = Difference.CONSISTENT;
-        if (lnode.hashesDiffer(rnode))
+        if (null != lnode && null != rnode && lnode.hashesDiffer(rnode))
         {
             logger.debug("({}) Inconsistent digest on left sub-range {}: [{}, {}]", active.depth, left, lnode, rnode);
 
@@ -271,13 +271,18 @@ public class MerkleTree
             else
                 ldiff = differenceHelper(ltree, rtree, diff, left);
         }
+        else if (null == lnode || null == rnode)
+        {
+            logger.debug("({}) Left sub-range fully inconsistent {}", active.depth, left);
+            ldiff = Difference.FULLY_INCONSISTENT;
+        }
 
         // see if we should recurse right
         lnode = ltree.find(right);
         rnode = rtree.find(right);
 
         Difference rdiff = Difference.CONSISTENT;
-        if (lnode.hashesDiffer(rnode))
+        if (null != lnode && null != rnode && lnode.hashesDiffer(rnode))
         {
             logger.debug("({}) Inconsistent digest on right sub-range {}: [{}, {}]", active.depth, right, lnode, rnode);
 
@@ -286,6 +291,11 @@ public class MerkleTree
             else
                 rdiff = differenceHelper(ltree, rtree, diff, right);
         }
+        else if (null == lnode || null == rnode)
+        {
+            logger.debug("({}) Right sub-range fully inconsistent {}", active.depth, right);
+            rdiff = Difference.FULLY_INCONSISTENT;
+        }
 
         if (ldiff == Difference.FULLY_INCONSISTENT && rdiff == Difference.FULLY_INCONSISTENT)
         {
@@ -326,7 +336,7 @@ public class MerkleTree
      * @return {@link Node} found. If nothing found, return {@link Leaf} with empty hash.
      */
     @VisibleForTesting
-    Node find(Range<Token> range)
+    private Node find(Range<Token> range)
     {
         try
         {
@@ -334,7 +344,7 @@ public class MerkleTree
         }
         catch (StopRecursion e)
         {
-            return new OnHeapLeaf();
+            return null;
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 03/09: Address Benedict's review feedback

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 73261324e27049a95b541d82eeacedf509c9ae48
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Tue Jul 23 10:28:03 2019 +0100

    Address Benedict's review feedback
---
 .../org/apache/cassandra/utils/FBUtilities.java    |  40 --
 .../org/apache/cassandra/utils/MerkleTree.java     | 410 ++++++++++++---------
 .../org/apache/cassandra/utils/MerkleTrees.java    |   5 +-
 .../repair/asymmetric/DifferenceHolderTest.java    |  10 +-
 .../org/apache/cassandra/utils/MerkleTreeTest.java |  16 +-
 .../apache/cassandra/utils/MerkleTreesTest.java    |  13 +-
 6 files changed, 259 insertions(+), 235 deletions(-)

diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 0f7b2ca..f0d9132 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -265,46 +265,6 @@ public class FBUtilities
         return compareUnsigned(bytes1, bytes2, 0, 0, bytes1.length, bytes2.length);
     }
 
-    /**
-     * @return The bitwise XOR of the inputs. The output will be the same length as the
-     * longer input, but if either input is null, the output will be null.
-     */
-    public static byte[] xor(byte[] left, byte[] right)
-    {
-        if (left == null || right == null)
-            return null;
-        if (left.length > right.length)
-        {
-            byte[] swap = left;
-            left = right;
-            right = swap;
-        }
-
-        // left.length is now <= right.length
-        byte[] out = Arrays.copyOf(right, right.length);
-        for (int i = 0; i < left.length; i++)
-        {
-            out[i] = (byte)((left[i] & 0xFF) ^ (right[i] & 0xFF));
-        }
-        return out;
-    }
-
-    /**
-     * Bitwise XOR of the inputs, in place on the left array
-     *
-     * Assumes inputs are same length
-     */
-    static void xorOntoLeft(byte[] left, byte[] right)
-    {
-        if (left == null || right == null)
-            return;
-
-        assert left.length == right.length;
-
-        for (int i = 0; i < left.length; i++)
-            left[i] = (byte) ((left[i] & 0xFF) ^ (right[i] & 0xFF));
-    }
-
     public static void sortSampledKeys(List<DecoratedKey> keys, Range<Token> range)
     {
         if (range.left.compareTo(range.right) >= 0)
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 9d9eadb..2052389 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 
+import static java.lang.String.format;
 import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static org.apache.cassandra.utils.ByteBufferUtil.compare;
 
@@ -86,12 +87,6 @@ public class MerkleTree
 
     public static final byte RECOMMENDED_DEPTH = Byte.MAX_VALUE - 1;
 
-    @SuppressWarnings("WeakerAccess")
-    static final int CONSISTENT = 0;
-    static final int FULLY_INCONSISTENT = 1;
-    @SuppressWarnings("WeakerAccess")
-    static final int PARTIALLY_INCONSISTENT = 2;
-
     private final int hashdepth;
 
     /** The top level range that this MerkleTree covers. */
@@ -150,13 +145,6 @@ public class MerkleTree
         size = (long) Math.pow(2, depth);
     }
 
-    public void release()
-    {
-        if (root instanceof OffHeapNode)
-            ((OffHeapNode) root).release();
-        root = null;
-    }
-
     private OnHeapNode initHelper(Token left, Token right, int depth, int max)
     {
         if (depth == max)
@@ -172,6 +160,13 @@ public class MerkleTree
         return new OnHeapInner(midpoint, leftChild, rightChild);
     }
 
+    public void release()
+    {
+        if (root instanceof OffHeapNode)
+            ((OffHeapNode) root).release();
+        root = null;
+    }
+
     public IPartitioner partitioner()
     {
         return partitioner;
@@ -222,7 +217,7 @@ public class MerkleTree
             else
             {
                 logger.debug("Digest mismatch detected, traversing trees [{}, {}]", ltree, rtree);
-                if (FULLY_INCONSISTENT == differenceHelper(ltree, rtree, diff, active))
+                if (Difference.FULLY_INCONSISTENT == differenceHelper(ltree, rtree, diff, active))
                 {
                     logger.debug("Range {} fully inconsistent", active);
                     diff.add(active);
@@ -233,6 +228,8 @@ public class MerkleTree
         return diff;
     }
 
+    enum Difference { CONSISTENT, FULLY_INCONSISTENT, PARTIALLY_INCONSISTENT }
+
     /**
      * TODO: This function could be optimized into a depth first traversal of the two trees in parallel.
      *
@@ -240,10 +237,10 @@ public class MerkleTree
      * @return FULLY_INCONSISTENT if active is inconsistent, PARTIALLY_INCONSISTENT if only a subrange is inconsistent.
      */
     @VisibleForTesting
-    static int differenceHelper(MerkleTree ltree, MerkleTree rtree, List<TreeRange> diff, TreeRange active)
+    static Difference differenceHelper(MerkleTree ltree, MerkleTree rtree, List<TreeRange> diff, TreeRange active)
     {
         if (active.depth == Byte.MAX_VALUE)
-            return CONSISTENT;
+            return Difference.CONSISTENT;
 
         Token midpoint = ltree.partitioner().midpoint(active.left, active.right);
         // sanity check for midpoint calculation, see CASSANDRA-13052
@@ -252,7 +249,7 @@ public class MerkleTree
             // If the midpoint equals either the left or the right, we have a range that's too small to split - we'll simply report the
             // whole range as inconsistent
             logger.debug("({}) No sane midpoint ({}) for range {} , marking whole range as inconsistent", active.depth, midpoint, active);
-            return FULLY_INCONSISTENT;
+            return Difference.FULLY_INCONSISTENT;
         }
 
         TreeRange left = new TreeRange(active.left, midpoint, active.depth + 1);
@@ -264,13 +261,13 @@ public class MerkleTree
         lnode = ltree.find(left);
         rnode = rtree.find(left);
 
-        int ldiff = CONSISTENT;
+        Difference ldiff = Difference.CONSISTENT;
         if (lnode.hashesDiffer(rnode))
         {
             logger.debug("({}) Inconsistent digest on left sub-range {}: [{}, {}]", active.depth, left, lnode, rnode);
 
             if (lnode instanceof Leaf)
-                ldiff = FULLY_INCONSISTENT;
+                ldiff = Difference.FULLY_INCONSISTENT;
             else
                 ldiff = differenceHelper(ltree, rtree, diff, left);
         }
@@ -279,118 +276,37 @@ public class MerkleTree
         lnode = ltree.find(right);
         rnode = rtree.find(right);
 
-        int rdiff = CONSISTENT;
+        Difference rdiff = Difference.CONSISTENT;
         if (lnode.hashesDiffer(rnode))
         {
             logger.debug("({}) Inconsistent digest on right sub-range {}: [{}, {}]", active.depth, right, lnode, rnode);
 
             if (rnode instanceof Leaf)
-                rdiff = FULLY_INCONSISTENT;
+                rdiff = Difference.FULLY_INCONSISTENT;
             else
                 rdiff = differenceHelper(ltree, rtree, diff, right);
         }
 
-        if (ldiff == FULLY_INCONSISTENT && rdiff == FULLY_INCONSISTENT)
+        if (ldiff == Difference.FULLY_INCONSISTENT && rdiff == Difference.FULLY_INCONSISTENT)
         {
             // both children are fully inconsistent
             logger.debug("({}) Fully inconsistent range [{}, {}]", active.depth, left, right);
-            return FULLY_INCONSISTENT;
+            return Difference.FULLY_INCONSISTENT;
         }
-        else if (ldiff == FULLY_INCONSISTENT)
+        else if (ldiff == Difference.FULLY_INCONSISTENT)
         {
             logger.debug("({}) Adding left sub-range to diff as fully inconsistent {}", active.depth, left);
             diff.add(left);
-            return PARTIALLY_INCONSISTENT;
+            return Difference.PARTIALLY_INCONSISTENT;
         }
-        else if (rdiff == FULLY_INCONSISTENT)
+        else if (rdiff == Difference.FULLY_INCONSISTENT)
         {
             logger.debug("({}) Adding right sub-range to diff as fully inconsistent {}", active.depth, right);
             diff.add(right);
-            return PARTIALLY_INCONSISTENT;
+            return Difference.PARTIALLY_INCONSISTENT;
         }
         logger.debug("({}) Range {} partially inconstent", active.depth, active);
-        return PARTIALLY_INCONSISTENT;
-    }
-
-    /**
-     * For testing purposes.
-     * Gets the smallest range containing the token.
-     */
-    public TreeRange get(Token t)
-    {
-        return getHelper(root, fullRange.left, fullRange.right, t);
-    }
-
-    private TreeRange getHelper(Node node, Token pleft, Token pright, Token t)
-    {
-        int depth = 0;
-
-        while (true)
-        {
-            if (node instanceof Leaf)
-            {
-                // we've reached a hash: wrap it up and deliver it
-                return new TreeRange(this, pleft, pright, depth, node);
-            }
-
-            assert node instanceof Inner;
-            Inner inner = (Inner) node;
-
-            if (Range.contains(pleft, inner.token(), t)) // left child contains token
-            {
-                pright = inner.token();
-                node = inner.left();
-            }
-            else // right child contains token
-            {
-                pleft = inner.token();
-                node = inner.right();
-            }
-
-            depth++;
-        }
-    }
-
-    /**
-     * Invalidates the ranges containing the given token.
-     * Useful for testing.
-     */
-    public void invalidate(Token t)
-    {
-        invalidateHelper(root, fullRange.left, t);
-    }
-
-    private void invalidateHelper(Node node, Token pleft, Token t)
-    {
-        node.hash(EMPTY_HASH);
-
-        if (node instanceof Leaf)
-            return;
-
-        assert node instanceof Inner;
-        Inner inner = (Inner) node;
-        // TODO: reset computed flag on OnHeapInners
-
-        if (Range.contains(pleft, inner.token(), t))
-            invalidateHelper(inner.left(), pleft, t); // left child contains token
-        else
-            invalidateHelper(inner.right(), inner.token(), t); // right child contains token
-    }
-
-    /**
-     * Hash the given range in the tree. The range must have been generated
-     * with recursive applications of partitioner.midpoint().
-     *
-     * NB: Currently does not support wrapping ranges that do not end with
-     * partitioner.getMinimumToken().
-     *
-     * @return {@link #EMPTY_HASH} if any subrange of the range is invalid, or if the exact
-     *         range cannot be calculated using this tree.
-     */
-    @VisibleForTesting
-    public byte[] hash(Range<Token> range)
-    {
-        return find(range).hash();
+        return Difference.PARTIALLY_INCONSISTENT;
     }
 
     /**
@@ -409,46 +325,19 @@ public class MerkleTree
      * @param range Range to find
      * @return {@link Node} found. If nothing found, return {@link Leaf} with empty hash.
      */
-    private Node find(Range<Token> range)
-    {
-        try
-        {
-            return findHelper(root, new Range<>(fullRange.left, fullRange.right), range);
-        }
-        catch (StopRecursion e)
-        {
-            return new OnHeapLeaf();
-        }
-    }
-
-    interface Consumer<E extends Exception>
-    {
-        void accept(Node node) throws E;
-    }
-
     @VisibleForTesting
-    <E extends Exception> boolean ifHashesRange(Range<Token> range, Consumer<E> consumer) throws E
+    Node find(Range<Token> range)
     {
         try
         {
-            Node node = findHelper(root, new Range<>(fullRange.left, fullRange.right), range);
-            boolean hasHash = !node.hasEmptyHash();
-            if (hasHash)
-                consumer.accept(node);
-            return hasHash;
+            return findHelper(root, fullRange, range);
         }
         catch (StopRecursion e)
         {
-            return false;
+            return new OnHeapLeaf();
         }
     }
 
-    @VisibleForTesting
-    boolean hashesRange(Range<Token> range)
-    {
-        return ifHashesRange(range, n -> {});
-    }
-
     /**
      * @throws StopRecursion If no match could be found for the range.
      */
@@ -812,18 +701,33 @@ public class MerkleTree
         return new MerkleTree(root, partitioner, fullRange, hashDepth, maxSize, innerNodeCount);
     }
 
-    private static boolean warnedOnce;
-    private static Node deserializeTree(DataInputPlus in, IPartitioner partitioner, int innerNodeCount, boolean offHeapRequested, int version) throws IOException
+    private static boolean shouldUseOffHeapTrees(IPartitioner partitioner, boolean offHeapRequested)
     {
         boolean offHeapSupported = partitioner instanceof Murmur3Partitioner || partitioner instanceof RandomPartitioner;
 
         if (offHeapRequested && !offHeapSupported && !warnedOnce)
         {
-            logger.warn("Configuration requests offheap memtables, but partitioner does not support it. Ignoring.");
+            logger.warn("Configuration requests off-heap merkle trees, but partitioner does not support it. Ignoring.");
             warnedOnce = true;
         }
 
-        return offHeapRequested && offHeapSupported
+        return offHeapRequested && offHeapSupported;
+    }
+    private static boolean warnedOnce;
+
+    private static ByteBuffer allocate(int innerNodeCount, IPartitioner partitioner)
+    {
+        int size = offHeapBufferSize(innerNodeCount, partitioner);
+        logger.debug("Allocating direct buffer of size {} for an off-heap merkle tree", size);
+        ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+        if (Ref.DEBUG_ENABLED)
+            MemoryUtil.setAttachment(buffer, new Ref<>(null, null));
+        return buffer;
+    }
+
+    private static Node deserializeTree(DataInputPlus in, IPartitioner partitioner, int innerNodeCount, boolean offHeapRequested, int version) throws IOException
+    {
+        return shouldUseOffHeapTrees(partitioner, offHeapRequested)
              ? deserializeOffHeap(in, partitioner, innerNodeCount, version)
              : OnHeapNode.deserialize(in, partitioner, version);
     }
@@ -833,39 +737,27 @@ public class MerkleTree
      * On the deserialization path, we know in advance what the tree looks like,
      * So we can pre-size an offheap buffer and deserialize into that.
      */
-
     MerkleTree tryMoveOffHeap() throws IOException
     {
-        boolean offHeapEnabled = DatabaseDescriptor.getOffheapMerkleTreesEnabled();
-        boolean offHeapSupported = partitioner instanceof Murmur3Partitioner || partitioner instanceof RandomPartitioner;
-
-        if (offHeapEnabled && !offHeapSupported && !warnedOnce)
-        {
-            logger.warn("Configuration requests offheap memtables, but partitioner does not support it. Ignoring.");
-            warnedOnce = true;
-        }
-
-        return root instanceof OnHeapNode && offHeapEnabled && offHeapSupported ? moveOffHeap() : this;
+        return root instanceof OnHeapNode && shouldUseOffHeapTrees(partitioner, DatabaseDescriptor.getOffheapMerkleTreesEnabled())
+             ? moveOffHeap()
+             : this;
     }
 
     private MerkleTree moveOffHeap() throws IOException
     {
         assert root instanceof OnHeapNode;
-        int bufferSize = offHeapBufferSize(Ints.checkedCast(size), partitioner);
-        logger.debug("Allocating direct buffer of size {} to move merkle tree off heap", bufferSize);
-        ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
+        ByteBuffer buffer = allocate(Ints.checkedCast(size), partitioner);
         int pointer = ((OnHeapNode) root).serializeOffHeap(buffer, partitioner);
-        OffHeapNode newRoot = fromPointer(pointer, buffer, partitioner).attachRef();
+        OffHeapNode newRoot = fromPointer(pointer, buffer, partitioner);
         return new MerkleTree(newRoot, partitioner, fullRange, hashdepth, maxsize, size);
     }
 
     private static OffHeapNode deserializeOffHeap(DataInputPlus in, IPartitioner partitioner, int innerNodeCount, int version) throws IOException
     {
-        int bufferSize = offHeapBufferSize(innerNodeCount, partitioner);
-        logger.debug("Allocating direct buffer of size {} for merkle tree deserialization", bufferSize);
-        ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
+        ByteBuffer buffer = allocate(innerNodeCount, partitioner);
         int pointer = OffHeapNode.deserialize(in, buffer, partitioner, version);
-        return fromPointer(pointer, buffer, partitioner).attachRef();
+        return fromPointer(pointer, buffer, partitioner);
     }
 
     private static OffHeapNode fromPointer(int pointer, ByteBuffer buffer, IPartitioner partitioner)
@@ -1059,13 +951,6 @@ public class MerkleTree
             return false;
         }
 
-        OffHeapNode attachRef()
-        {
-            if (Ref.DEBUG_ENABLED)
-                MemoryUtil.setAttachment(buffer, new Ref<>(this, null));
-            return this;
-        }
-
         void release()
         {
             Object attachment = MemoryUtil.getAttachment(buffer);
@@ -1160,7 +1045,7 @@ public class MerkleTree
             if (hasEmptyHash())
                 hash(partitionHash);
             else
-                FBUtilities.xorOntoLeft(hash, partitionHash);
+                xorOntoLeft(hash, partitionHash);
 
             sizeOfRange += partitionSize;
             partitionsInRange += 1;
@@ -1168,15 +1053,17 @@ public class MerkleTree
 
         static OnHeapLeaf deserializeWithoutIdent(DataInputPlus in) throws IOException
         {
-            if (in.readByte() > 0)
+            int size = in.readByte();
+            switch (size)
             {
-                byte[] hash = new byte[HASH_SIZE];
-                in.readFully(hash);
-                return new OnHeapLeaf(hash);
-            }
-            else
-            {
-                return new OnHeapLeaf();
+                case HASH_SIZE:
+                    byte[] hash = new byte[HASH_SIZE];
+                    in.readFully(hash);
+                    return new OnHeapLeaf(hash);
+                case 0:
+                    return new OnHeapLeaf();
+                default:
+                    throw new IllegalStateException(format("Hash of size %d encountered, expecting %d or %d", size, HASH_SIZE, 0));
             }
         }
 
@@ -1318,6 +1205,10 @@ public class MerkleTree
             Inner that = (Inner) other;
             return !hashesDiffer(other) && this.left().equals(that.left()) && this.right().equals(that.right());
         }
+
+        default void unsafeInvalidate()
+        {
+        }
     }
 
     static class OnHeapInner extends OnHeapNode implements Inner
@@ -1372,7 +1263,7 @@ public class MerkleTree
                 right.compute();
 
                 if (!left.hasEmptyHash() && !right.hasEmptyHash())
-                    hash = FBUtilities.xor(left.hash(), right.hash());
+                    hash = xor(left.hash(), right.hash());
                 else if (left.hasEmptyHash())
                     hash = right.hash();
                 else if (right.hasEmptyHash())
@@ -1432,6 +1323,12 @@ public class MerkleTree
             toString(buff, 1);
             return buff.toString();
         }
+
+        @Override
+        public void unsafeInvalidate()
+        {
+            computed = false;
+        }
     }
 
     static class OffHeapInner extends OffHeapNode implements Inner
@@ -1468,13 +1365,17 @@ public class MerkleTree
 
         public Node left()
         {
-            int pointer = buffer.getInt(offset + LEFT_CHILD_POINTER_OFFSET);
-            return pointer >= 0 ? new OffHeapInner(buffer, pointer, partitioner) : new OffHeapLeaf(buffer, ~pointer);
+            return child(LEFT_CHILD_POINTER_OFFSET);
         }
 
         public Node right()
         {
-            int pointer = buffer.getInt(offset + RIGHT_CHILD_POINTER_OFFSET);
+            return child(RIGHT_CHILD_POINTER_OFFSET);
+        }
+
+        private Node child(int childOffset)
+        {
+            int pointer = buffer.getInt(offset + childOffset);
             return pointer >= 0 ? new OffHeapInner(buffer, pointer, partitioner) : new OffHeapLeaf(buffer, ~pointer);
         }
 
@@ -1539,6 +1440,30 @@ public class MerkleTree
     }
 
     /**
+     * @return The bitwise XOR of the inputs.
+     */
+    static byte[] xor(byte[] left, byte[] right)
+    {
+        assert left.length == right.length;
+
+        byte[] out = Arrays.copyOf(right, right.length);
+        for (int i = 0; i < left.length; i++)
+            out[i] = (byte)((left[i] & 0xFF) ^ (right[i] & 0xFF));
+        return out;
+    }
+
+    /**
+     * Bitwise XOR of the inputs, in place on the left array.
+     */
+    private static void xorOntoLeft(byte[] left, byte[] right)
+    {
+        assert left.length == right.length;
+
+        for (int i = 0; i < left.length; i++)
+            left[i] = (byte) ((left[i] & 0xFF) ^ (right[i] & 0xFF));
+    }
+
+    /**
      * Estimate the allowable depth while keeping the resulting heap usage of this tree under the provided
      * number of bytes. This is important for ensuring that we do not allocate overly large trees that could
      * OOM the JVM and cause instability.
@@ -1583,4 +1508,125 @@ public class MerkleTree
         long adjustedBytes = Math.max(1, (numBytes + sizeOfInner) / (sizeOfLeaf + sizeOfInner));
         return Math.max(1, (int) Math.floor(Math.log(adjustedBytes) / Math.log(2)));
     }
+
+    /*
+     * Test-only methods.
+     */
+
+    /**
+     * Invalidates the ranges containing the given token.
+     * Useful for testing.
+     */
+    @VisibleForTesting
+    void unsafeInvalidate(Token t)
+    {
+        unsafeInvalidateHelper(root, fullRange.left, t);
+    }
+
+    private void unsafeInvalidateHelper(Node node, Token pleft, Token t)
+    {
+        node.hash(EMPTY_HASH);
+
+        if (node instanceof Leaf)
+            return;
+
+        assert node instanceof Inner;
+        Inner inner = (Inner) node;
+        inner.unsafeInvalidate();
+
+        if (Range.contains(pleft, inner.token(), t))
+            unsafeInvalidateHelper(inner.left(), pleft, t); // left child contains token
+        else
+            unsafeInvalidateHelper(inner.right(), inner.token(), t); // right child contains token
+    }
+
+    /**
+     * Hash the given range in the tree. The range must have been generated
+     * with recursive applications of partitioner.midpoint().
+     *
+     * NB: Currently does not support wrapping ranges that do not end with
+     * partitioner.getMinimumToken().
+     *
+     * @return {@link #EMPTY_HASH} if any subrange of the range is invalid, or if the exact
+     *         range cannot be calculated using this tree.
+     */
+    @VisibleForTesting
+    byte[] hash(Range<Token> range)
+    {
+        return find(range).hash();
+    }
+
+    interface Consumer<E extends Exception>
+    {
+        void accept(Node node) throws E;
+    }
+
+    @VisibleForTesting
+    <E extends Exception> boolean ifHashesRange(Range<Token> range, Consumer<E> consumer) throws E
+    {
+        try
+        {
+            Node node = findHelper(root, new Range<>(fullRange.left, fullRange.right), range);
+            boolean hasHash = !node.hasEmptyHash();
+            if (hasHash)
+                consumer.accept(node);
+            return hasHash;
+        }
+        catch (StopRecursion e)
+        {
+            return false;
+        }
+    }
+
+    @VisibleForTesting
+    boolean hashesRange(Range<Token> range)
+    {
+        return ifHashesRange(range, n -> {});
+    }
+
+    /**
+     * For testing purposes.
+     * Gets the smallest range containing the token.
+     */
+    @VisibleForTesting
+    public TreeRange get(Token t)
+    {
+        return getHelper(root, fullRange.left, fullRange.right, t);
+    }
+
+    private TreeRange getHelper(Node node, Token pleft, Token pright, Token t)
+    {
+        int depth = 0;
+
+        while (true)
+        {
+            if (node instanceof Leaf)
+            {
+                // we've reached a hash: wrap it up and deliver it
+                return new TreeRange(this, pleft, pright, depth, node);
+            }
+
+            assert node instanceof Inner;
+            Inner inner = (Inner) node;
+
+            if (Range.contains(pleft, inner.token(), t)) // left child contains token
+            {
+                pright = inner.token();
+                node = inner.left();
+            }
+            else // right child contains token
+            {
+                pleft = inner.token();
+                node = inner.right();
+            }
+
+            depth++;
+        }
+    }
+
+    @VisibleForTesting
+    void compute()
+    {
+        root.compute();
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/MerkleTrees.java b/src/java/org/apache/cassandra/utils/MerkleTrees.java
index 9cf04c5..0043fe0 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@ -147,7 +147,8 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
      */
     public void release()
     {
-        merkleTrees.values().forEach(MerkleTree::release); merkleTrees.clear();
+        merkleTrees.values().forEach(MerkleTree::release);
+        merkleTrees.clear();
     }
 
     /**
@@ -179,7 +180,7 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
     @VisibleForTesting
     public void invalidate(Token t)
     {
-        getMerkleTree(t).invalidate(t);
+        getMerkleTree(t).unsafeInvalidate(t);
     }
 
     /**
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
index de69cd7..8881018 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.TreeResponse;
+import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.MerkleTreesTest;
@@ -40,6 +41,11 @@ import static org.junit.Assert.assertTrue;
 
 public class DifferenceHolderTest
 {
+    private static byte[] digest(String string)
+    {
+        return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes());
+    }
+
     @Test
     public void testFromEmptyMerkleTrees() throws UnknownHostException
     {
@@ -91,8 +97,8 @@ public class DifferenceHolderTest
 
         // set the hashes for the leaf of the created split
         middle = mt1.get(leftmost.right);
-        middle.hash("arbitrary!".getBytes());
-        mt1.get(partitioner.midpoint(leftmost.left, leftmost.right)).hash("even more arbitrary!".getBytes());
+        middle.hash(digest("arbitrary!"));
+        mt1.get(partitioner.midpoint(leftmost.left, leftmost.right)).hash(digest("even more arbitrary!"));
 
         TreeResponse tr1 = new TreeResponse(a1, mt1);
         TreeResponse tr2 = new TreeResponse(a2, mt2);
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index d8491d0..6ce2652 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -46,7 +46,12 @@ import static org.junit.Assert.*;
 
 public class MerkleTreeTest
 {
-    private static final byte[] DUMMY = HashingUtils.newMessageDigest("SHA-256").digest("dummy".getBytes());
+    private static final byte[] DUMMY = digest("dummy");
+
+    private static byte[] digest(String string)
+    {
+        return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes());
+    }
 
     /**
      * If a test assumes that the tree is 8 units wide, then it should set this value
@@ -452,8 +457,8 @@ public class MerkleTreeTest
 
         // set the hashes for the leaf of the created split
         middle = mt.get(leftmost.right);
-        middle.hash("arbitrary!".getBytes());
-        mt.get(partitioner.midpoint(leftmost.left, leftmost.right)).hash("even more arbitrary!".getBytes());
+        middle.hash(digest("arbitrary!"));
+        mt.get(partitioner.midpoint(leftmost.left, leftmost.right)).hash(digest("even more arbitrary!"));
 
         // trees should disagree for (leftmost.left, middle.right]
         List<TreeRange> diffs = MerkleTree.difference(mt, mt2);
@@ -491,7 +496,8 @@ public class MerkleTreeTest
 
         List<TreeRange> diffs = MerkleTree.difference(ltree, rtree);
         assertEquals(Lists.newArrayList(range), diffs);
-        assertEquals(MerkleTree.FULLY_INCONSISTENT, MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new MerkleTree.TreeRange(ltree.fullRange.left, ltree.fullRange.right, (byte)0)));
+        assertEquals(MerkleTree.Difference.FULLY_INCONSISTENT,
+                     MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new MerkleTree.TreeRange(ltree.fullRange.left, ltree.fullRange.right, (byte)0)));
     }
 
     /**
@@ -548,7 +554,7 @@ public class MerkleTreeTest
             while (depth.equals(dstack.peek()))
             {
                 // consume the stack
-                hash = FBUtilities.xor(hstack.pop(), hash);
+                hash = MerkleTree.xor(hstack.pop(), hash);
                 depth = dstack.pop() - 1;
             }
             dstack.push(depth);
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
index a1f6068..9e70c20 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
@@ -42,7 +42,12 @@ import static org.junit.Assert.*;
 
 public class MerkleTreesTest
 {
-    private static final byte[] DUMMY = HashingUtils.newMessageDigest("SHA-256").digest("dummy".getBytes());
+    private static final byte[] DUMMY = digest("dummy");
+
+    private static byte[] digest(String string)
+    {
+        return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes());
+    }
 
     /**
      * If a test assumes that the tree is 8 units wide, then it should set this value
@@ -469,8 +474,8 @@ public class MerkleTreesTest
 
         // set the hashes for the leaf of the created split
         middle = mts.get(leftmost.right);
-        middle.hash("arbitrary!".getBytes());
-        mts.get(partitioner.midpoint(leftmost.left, leftmost.right)).hash("even more arbitrary!".getBytes());
+        middle.hash(digest("arbitrary!"));
+        mts.get(partitioner.midpoint(leftmost.left, leftmost.right)).hash(digest("even more arbitrary!"));
 
         // trees should disagree for (leftmost.left, middle.right]
         List<Range<Token>> diffs = MerkleTrees.difference(mts, mts2);
@@ -499,7 +504,7 @@ public class MerkleTreesTest
             while (depth.equals(dstack.peek()))
             {
                 // consume the stack
-                hash = FBUtilities.xor(hstack.pop(), hash);
+                hash = MerkleTree.xor(hstack.pop(), hash);
                 depth = dstack.pop()-1;
             }
             dstack.push(depth);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/09: HIGHRES config.yml

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 51a29f21cf3f12dde0a33f4ff6d5e9ca547d6c18
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Thu Jul 4 20:18:08 2019 +0100

    HIGHRES config.yml
---
 .circleci/config.yml | 96 ++++++++++++++++++++++++++--------------------------
 1 file changed, 48 insertions(+), 48 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index 27fda51..8ac50c9 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -3,10 +3,10 @@ jobs:
   j11_unit_tests:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11:20181210
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -79,18 +79,18 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-11-openjdk-amd64
     - CASSANDRA_USE_JDK11: true
   j11_dtests-with-vnodes:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11:20181210
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -184,18 +184,18 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-11-openjdk-amd64
     - CASSANDRA_USE_JDK11: true
   j8_dtests-no-vnodes:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -268,17 +268,17 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_upgradetests-no-vnodes:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -366,14 +366,14 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   utests_stress:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -408,17 +408,17 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_unit_tests:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -491,8 +491,8 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j11_build:
@@ -571,18 +571,18 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-11-openjdk-amd64
     - CASSANDRA_USE_JDK11: true
   j8_dtests-with-vnodes:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -654,14 +654,14 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_jvm_dtests:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -696,8 +696,8 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_build:
@@ -776,14 +776,14 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   utests_long:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -818,14 +818,14 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   utests_fqltool:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
     parallelism: 1
@@ -860,17 +860,17 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j11_dtests-no-vnodes:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11:20181210
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -965,18 +965,18 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-11-openjdk-amd64
     - CASSANDRA_USE_JDK11: true
   utests_compression:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1049,8 +1049,8 @@ jobs:
     - CASSANDRA_SKIP_SYNC: true
     - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
     - DTEST_BRANCH: master
-    - CCM_MAX_HEAP_SIZE: 1024M
-    - CCM_HEAP_NEWSIZE: 256M
+    - CCM_MAX_HEAP_SIZE: 2048M
+    - CCM_HEAP_NEWSIZE: 512M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
 workflows:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 07/09: Test difference()

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 575c5fa2b2b25a140525b1e2c1ed465db13dac81
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Wed Jul 31 16:17:13 2019 +0100

    Test difference()
---
 .../org/apache/cassandra/utils/MerkleTree.java     |  13 +-
 .../org/apache/cassandra/utils/MerkleTreeTest.java | 243 ++++++++++++++++++++-
 2 files changed, 243 insertions(+), 13 deletions(-)

diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 45d14a6..4c385ab 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -553,16 +553,20 @@ public class MerkleTree
          */
         public void addHash(RowHash entry)
         {
+            addHash(entry.hash, entry.size);
+        }
+
+        void addHash(byte[] hash, long partitionSize)
+        {
             assert tree != null : "Not intended for modification!";
 
             assert node instanceof OnHeapLeaf;
-            ((OnHeapLeaf) node).addHash(entry.hash, entry.size);
+            ((OnHeapLeaf) node).addHash(hash, partitionSize);
         }
 
         public void addAll(Iterator<RowHash> entries)
         {
-            while (entries.hasNext())
-                addHash(entries.next());
+            while (entries.hasNext()) addHash(entries.next());
         }
 
         @Override
@@ -759,7 +763,8 @@ public class MerkleTree
              : this;
     }
 
-    private MerkleTree moveOffHeap() throws IOException
+    @VisibleForTesting
+    MerkleTree moveOffHeap() throws IOException
     {
         assert root instanceof OnHeapNode;
         root.fillInnerHashes(); // ensure on-heap trees' inner node hashes have been computed
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index 0ec6858..582dcbf 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -17,12 +17,11 @@
  */
 package org.apache.cassandra.utils;
 
+import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.Lists;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +40,7 @@ import org.apache.cassandra.utils.MerkleTree.RowHash;
 import org.apache.cassandra.utils.MerkleTree.TreeRange;
 import org.apache.cassandra.utils.MerkleTree.TreeRangeIterator;
 
+import static com.google.common.collect.Lists.newArrayList;
 import static org.apache.cassandra.utils.MerkleTree.RECOMMENDED_DEPTH;
 import static org.junit.Assert.*;
 
@@ -414,7 +414,7 @@ public class MerkleTreeTest
             MerkleTree.deserialize(new DataInputBuffer(serialized), false, MessagingService.current_version);
         MerkleTree restoredOffHeap =
             MerkleTree.deserialize(new DataInputBuffer(serialized), true, MessagingService.current_version);
-        MerkleTree movedOffHeap = mt.tryMoveOffHeap();
+        MerkleTree movedOffHeap = mt.moveOffHeap();
 
         assertHashEquals(initialhash, restoredOnHeap.hash(full));
         assertHashEquals(initialhash, restoredOffHeap.hash(full));
@@ -481,8 +481,8 @@ public class MerkleTreeTest
         MerkleTree rtree = new MerkleTree(partitioner, range, RECOMMENDED_DEPTH, 16);
         rtree.init();
 
-        byte[] h1 = "asdf".getBytes();
-        byte[] h2 = "hjkl".getBytes();
+        byte[] h1 = digest("asdf");
+        byte[] h2 = digest("hjkl");
 
         // add dummy hashes to both trees
         for (TreeRange tree : ltree.rangeIterator())
@@ -495,7 +495,7 @@ public class MerkleTreeTest
         }
 
         List<TreeRange> diffs = MerkleTree.difference(ltree, rtree);
-        assertEquals(Lists.newArrayList(range), diffs);
+        assertEquals(newArrayList(range), diffs);
         assertEquals(MerkleTree.Difference.FULLY_INCONSISTENT,
                      MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new MerkleTree.TreeRange(ltree.fullRange.left, ltree.fullRange.right, (byte)0)));
     }
@@ -515,8 +515,8 @@ public class MerkleTreeTest
         MerkleTree rtree = new MerkleTree(partitioner, range, RECOMMENDED_DEPTH, 16);
         rtree.init();
 
-        byte[] h1 = "asdf".getBytes();
-        byte[] h2 = "asdf".getBytes();
+        byte[] h1 = digest("asdf");
+        byte[] h2 = digest("asdf");
 
 
         // add dummy hashes to both trees
@@ -530,7 +530,7 @@ public class MerkleTreeTest
         }
 
         // top level difference() should show no differences
-        assertEquals(MerkleTree.difference(ltree, rtree), Lists.newArrayList());
+        assertEquals(MerkleTree.difference(ltree, rtree), newArrayList());
     }
 
     /**
@@ -664,4 +664,229 @@ public class MerkleTreeTest
         tree.hash(fullRange);
         return ObjectSizes.measureDeep(tree);
     }
+
+    @Test
+    public void testEqualTreesSameDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 3, 3);
+        testDifferences(trees, Collections.emptyList());
+    }
+
+    @Test
+    public void testEqualTreesDifferentDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 2, 3);
+        testDifferences(trees, Collections.emptyList());
+    }
+
+    @Test
+    public void testEntirelyDifferentTrees() throws IOException
+    {
+        int seed1 = makeSeed();
+        int seed2 = seed1 * 32;
+        Trees trees = Trees.make(seed1, seed2, 3, 3);
+        testDifferences(trees, newArrayList(makeTreeRange(0, 16, 0)));
+    }
+
+    @Test
+    public void testDifferentTrees1SameDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 3, 3);
+        trees.tree1.get(longToken(1)).addHash(digest("diff_1"), 1);
+        testDifferences(trees, newArrayList(makeTreeRange(0, 2, 3)));
+    }
+
+    @Test
+    public void testDifferentTrees1DifferentDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 2, 3);
+        trees.tree1.get(longToken(1)).addHash(digest("diff_1"), 1);
+        testDifferences(trees, newArrayList(makeTreeRange(0, 4, 2)));
+    }
+
+    @Test
+    public void testDifferentTrees2SameDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 3, 3);
+        trees.tree1.get(longToken(1)).addHash(digest("diff_1"), 1);
+        trees.tree2.get(longToken(16)).addHash(digest("diff_16"), 1);
+        testDifferences(trees, newArrayList(makeTreeRange(0,  2,  3),
+                                            makeTreeRange(14, 16, 3)));
+    }
+
+    @Test
+    public void testDifferentTrees2DifferentDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 2, 3);
+        trees.tree1.get(longToken(1)).addHash(digest("diff_1"), 1);
+        trees.tree2.get(longToken(16)).addHash(digest("diff_16"), 1);
+        testDifferences(trees, newArrayList(makeTreeRange(0,  4,  2),
+                                            makeTreeRange(12, 16, 2)));
+    }
+
+    @Test
+    public void testDifferentTrees3SameDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 3, 3);
+        trees.tree1.get(longToken(1)).addHash(digest("diff_1"), 1);
+        trees.tree1.get(longToken(3)).addHash(digest("diff_3"), 1);
+        testDifferences(trees, newArrayList(makeTreeRange(0,  4,  2)));
+    }
+
+    @Test
+    public void testDifferentTrees3Differentepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 2, 3);
+        trees.tree1.get(longToken(1)).addHash(digest("diff_1"), 1);
+        trees.tree1.get(longToken(3)).addHash(digest("diff_3"), 1);
+        testDifferences(trees, newArrayList(makeTreeRange(0,  4,  2)));
+    }
+
+    @Test
+    public void testDifferentTrees4SameDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 3, 3);
+        trees.tree1.get(longToken(4)).addHash(digest("diff_4"), 1);
+        trees.tree1.get(longToken(8)).addHash(digest("diff_8"), 1);
+        trees.tree1.get(longToken(12)).addHash(digest("diff_12"), 1);
+        trees.tree1.get(longToken(16)).addHash(digest("diff_16"), 1);
+        testDifferences(trees, newArrayList(makeTreeRange(2,  4,  3),
+                                            makeTreeRange(6,  8,  3),
+                                            makeTreeRange(10, 12, 3),
+                                            makeTreeRange(14, 16, 3)));
+    }
+
+    @Test
+    public void testDifferentTrees4DifferentDepth() throws IOException
+    {
+        int seed = makeSeed();
+        Trees trees = Trees.make(seed, seed, 2, 3);
+        trees.tree1.get(longToken(4)).addHash(digest("diff_4"), 1);
+        trees.tree1.get(longToken(8)).addHash(digest("diff_8"), 1);
+        trees.tree1.get(longToken(12)).addHash(digest("diff_12"), 1);
+        trees.tree1.get(longToken(16)).addHash(digest("diff_16"), 1);
+        testDifferences(trees, newArrayList(makeTreeRange(0, 16, 0)));
+    }
+
+    private static void testDifferences(Trees trees, List<TreeRange> expectedDifference) throws IOException
+    {
+        MerkleTree mt1 = trees.tree1;
+        MerkleTree mt2 = trees.tree2;
+
+        assertDiffer(mt1,               mt2,               expectedDifference);
+        assertDiffer(mt1,               mt2.moveOffHeap(), expectedDifference);
+        assertDiffer(mt1,               cycle(mt2, true),  expectedDifference);
+        assertDiffer(mt1,               cycle(mt2, false), expectedDifference);
+
+        assertDiffer(mt1.moveOffHeap(), mt2,               expectedDifference);
+        assertDiffer(mt1.moveOffHeap(), mt2.moveOffHeap(), expectedDifference);
+        assertDiffer(mt1.moveOffHeap(), cycle(mt2, true),  expectedDifference);
+        assertDiffer(mt1.moveOffHeap(), cycle(mt2, false), expectedDifference);
+
+        assertDiffer(cycle(mt1, true),  mt2,               expectedDifference);
+        assertDiffer(cycle(mt1, true),  mt2.moveOffHeap(), expectedDifference);
+        assertDiffer(cycle(mt1, true),  cycle(mt2, true),  expectedDifference);
+        assertDiffer(cycle(mt1, true),  cycle(mt2, false), expectedDifference);
+
+        assertDiffer(cycle(mt1, false), mt2,               expectedDifference);
+        assertDiffer(cycle(mt1, false), mt2.moveOffHeap(), expectedDifference);
+        assertDiffer(cycle(mt1, false), cycle(mt2, true),  expectedDifference);
+        assertDiffer(cycle(mt1, false), cycle(mt2, false), expectedDifference);
+    }
+
+    private static void assertDiffer(MerkleTree mt1, MerkleTree mt2, List<TreeRange> expectedDifference)
+    {
+        assertEquals(expectedDifference, MerkleTree.difference(mt1, mt2));
+        assertEquals(expectedDifference, MerkleTree.difference(mt2, mt1));
+    }
+
+    private static Range<Token> longTokenRange(long start, long end)
+    {
+        return new Range<>(longToken(start), longToken(end));
+    }
+
+    private static Murmur3Partitioner.LongToken longToken(long value)
+    {
+        return new Murmur3Partitioner.LongToken(value);
+    }
+
+    private static MerkleTree cycle(MerkleTree mt, boolean offHeapRequested) throws IOException
+    {
+        try (DataOutputBuffer output = new DataOutputBuffer())
+        {
+            mt.serialize(output, MessagingService.current_version);
+
+            try (DataInputBuffer input = new DataInputBuffer(output.buffer(false), false))
+            {
+                return MerkleTree.deserialize(input, offHeapRequested, MessagingService.current_version);
+            }
+        }
+    }
+
+    private static MerkleTree makeTree(long start, long end, int depth)
+    {
+        MerkleTree mt = new MerkleTree(Murmur3Partitioner.instance, longTokenRange(start, end), depth, Long.MAX_VALUE);
+        mt.init();
+        return mt;
+    }
+
+    private static TreeRange makeTreeRange(long start, long end, int depth)
+    {
+        return new TreeRange(longToken(start), longToken(end), depth);
+    }
+
+    private static byte[][] makeHashes(int count, int seed)
+    {
+        Random random = new Random(seed);
+
+        byte[][] hashes = new byte[count][32];
+        for (int i = 0; i < count; i++)
+            random.nextBytes(hashes[i]);
+        return hashes;
+    }
+
+    private static int makeSeed()
+    {
+        int seed = (int) System.currentTimeMillis();
+        System.out.println("Using seed " + seed);
+        return seed;
+    }
+
+    private static class Trees
+    {
+        MerkleTree tree1;
+        MerkleTree tree2;
+
+        Trees(MerkleTree tree1, MerkleTree tree2)
+        {
+            this.tree1 = tree1;
+            this.tree2 = tree2;
+        }
+
+        static Trees make(int hashes1seed, int hashes2seed, int tree1depth, int tree2depth)
+        {
+            byte[][] hashes1 = makeHashes(16, hashes1seed);
+            byte[][] hashes2 = makeHashes(16, hashes2seed);
+
+            MerkleTree tree1 = makeTree(0, 16, tree1depth);
+            MerkleTree tree2 = makeTree(0, 16, tree2depth);
+
+            for (int tok = 1; tok <= 16; tok++)
+            {
+                tree1.get(longToken(tok)).addHash(hashes1[tok - 1], 1);
+                tree2.get(longToken(tok)).addHash(hashes2[tok - 1], 1);
+            }
+
+            return new Trees(tree1, tree2);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/09: Make repair coordination less expensive by moving MerkleTrees off heap

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit be64c1c26a879bad97390ad3d38e2ed03806c8c8
Author: Jeff Jirsa <jj...@apple.com>
AuthorDate: Sat Mar 16 17:30:54 2019 +0000

    Make repair coordination less expensive by moving MerkleTrees off heap
    
    patch by Aleksey Yeschenko and Jeff Jirsa; reviewed by Benedict Elliott
    Smith and Marcus Eriksson for CASSANDRA-15202
    
    Co-authored-by: Aleksey Yeschenko <al...@apache.org>
    Co-authored-by: Jeff Jirsa <jj...@apple.com>
---
 src/java/org/apache/cassandra/config/Config.java   |    2 +
 .../cassandra/config/DatabaseDescriptor.java       |   11 +
 .../cassandra/dht/ByteOrderedPartitioner.java      |    6 +
 .../org/apache/cassandra/dht/IPartitioner.java     |    5 +
 .../apache/cassandra/dht/Murmur3Partitioner.java   |   33 +
 .../apache/cassandra/dht/RandomPartitioner.java    |   32 +
 src/java/org/apache/cassandra/dht/Token.java       |   40 +-
 .../org/apache/cassandra/repair/RepairJob.java     |    3 +
 .../org/apache/cassandra/repair/Validator.java     |   73 +-
 .../repair/messages/ValidationComplete.java        |    9 +
 .../cassandra/service/ActiveRepairService.java     |   10 +
 .../org/apache/cassandra/utils/ByteBufferUtil.java |   30 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |   16 +
 .../apache/cassandra/utils/FastByteOperations.java |   46 +-
 .../org/apache/cassandra/utils/MerkleTree.java     | 1406 +++++++++++++-------
 .../org/apache/cassandra/utils/MerkleTrees.java    |   63 +-
 .../apache/cassandra/repair/LocalSyncTaskTest.java |    4 -
 .../org/apache/cassandra/repair/RepairJobTest.java |    4 -
 .../org/apache/cassandra/repair/ValidatorTest.java |   18 +-
 .../repair/asymmetric/DifferenceHolderTest.java    |    6 +-
 .../org/apache/cassandra/utils/MerkleTreeTest.java |  127 +-
 .../apache/cassandra/utils/MerkleTreesTest.java    |   99 +-
 22 files changed, 1344 insertions(+), 699 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 34a5ce8..b56419d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -130,6 +130,8 @@ public class Config
     public volatile Integer repair_session_max_tree_depth = null;
     public volatile Integer repair_session_space_in_mb = null;
 
+    public volatile boolean prefer_offheap_merkle_trees = true;
+
     public int storage_port = 7000;
     public int ssl_storage_port = 7001;
     public String listen_address;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 0166c5f..49761da 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2907,4 +2907,15 @@ public class DatabaseDescriptor
     {
         return strictRuntimeChecks;
     }
+
+    public static boolean getOffheapMerkleTreesEnabled()
+    {
+        return conf.prefer_offheap_merkle_trees;
+    }
+
+    public static void setOffheapMerkleTreesEnabled(boolean value)
+    {
+        logger.info("Setting prefer_offheap_merkle_trees to {}", value);
+        conf.prefer_offheap_merkle_trees = value;
+    }
 }
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index 08088f7..a6314dc 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -234,6 +234,12 @@ public class ByteOrderedPartitioner implements IPartitioner
             return new BytesToken(bytes);
         }
 
+        @Override
+        public int byteSize(Token token)
+        {
+            return ((BytesToken) token).token.length;
+        }
+
         public String toString(Token token)
         {
             BytesToken bytesToken = (BytesToken) token;
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index f433f20..ef8ced2 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -135,4 +135,9 @@ public interface IPartitioner
     {
         return Optional.empty();
     }
+
+    default public int getMaxTokenSize()
+    {
+        return Integer.MIN_VALUE;
+    }
 }
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 0f922e3..52d0efb 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.dht;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -25,10 +26,12 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.PreHashedDecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -42,6 +45,7 @@ public class Murmur3Partitioner implements IPartitioner
 {
     public static final LongToken MINIMUM = new LongToken(Long.MIN_VALUE);
     public static final long MAXIMUM = Long.MAX_VALUE;
+    private static final int MAXIMUM_TOKEN_SIZE = TypeSizes.sizeof(MAXIMUM);
 
     private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(MINIMUM);
 
@@ -224,6 +228,11 @@ public class Murmur3Partitioner implements IPartitioner
         return new LongToken(normalize(hash[0]));
     }
 
+    public int getMaxTokenSize()
+    {
+        return MAXIMUM_TOKEN_SIZE;
+    }
+
     private long[] getHash(ByteBuffer key)
     {
         long[] hash = new long[2];
@@ -300,11 +309,35 @@ public class Murmur3Partitioner implements IPartitioner
             return ByteBufferUtil.bytes(longToken.token);
         }
 
+        @Override
+        public void serialize(Token token, DataOutputPlus out) throws IOException
+        {
+            out.writeLong(((LongToken) token).token);
+        }
+
+        @Override
+        public void serialize(Token token, ByteBuffer out)
+        {
+            out.putLong(((LongToken) token).token);
+        }
+
+        @Override
+        public int byteSize(Token token)
+        {
+            return 8;
+        }
+
         public Token fromByteArray(ByteBuffer bytes)
         {
             return new LongToken(ByteBufferUtil.toLong(bytes));
         }
 
+        @Override
+        public Token fromByteBuffer(ByteBuffer bytes, int position, int length)
+        {
+            return new LongToken(bytes.getLong(position));
+        }
+
         public String toString(Token token)
         {
             return token.toString();
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 4e63475..0457a89 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.dht;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -31,6 +32,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.GuidGenerator;
@@ -46,6 +48,7 @@ public class RandomPartitioner implements IPartitioner
     public static final BigInteger ZERO = new BigInteger("0");
     public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
     public static final BigInteger MAXIMUM = new BigInteger("2").pow(127);
+    public static final int MAXIMUM_TOKEN_SIZE = MAXIMUM.bitLength() / 8 + 1;
 
     /**
      * Maintain a separate threadlocal message digest, exclusively for token hashing. This is necessary because
@@ -162,11 +165,35 @@ public class RandomPartitioner implements IPartitioner
             return ByteBuffer.wrap(bigIntegerToken.token.toByteArray());
         }
 
+        @Override
+        public void serialize(Token token, DataOutputPlus out) throws IOException
+        {
+            out.write(((BigIntegerToken) token).token.toByteArray());
+        }
+
+        @Override
+        public void serialize(Token token, ByteBuffer out)
+        {
+            out.put(((BigIntegerToken) token).token.toByteArray());
+        }
+
+        @Override
+        public int byteSize(Token token)
+        {
+            return ((BigIntegerToken) token).token.bitLength() / 8 + 1;
+        }
+
         public Token fromByteArray(ByteBuffer bytes)
         {
             return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
         }
 
+        @Override
+        public Token fromByteBuffer(ByteBuffer bytes, int position, int length)
+        {
+            return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes, position, length)));
+        }
+
         public String toString(Token token)
         {
             BigIntegerToken bigIntegerToken = (BigIntegerToken) token;
@@ -252,6 +279,11 @@ public class RandomPartitioner implements IPartitioner
         return new BigIntegerToken(hashToBigInteger(key));
     }
 
+    public int getMaxTokenSize()
+    {
+        return MAXIMUM_TOKEN_SIZE;
+    }
+
     public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
     {
         Map<Token, Float> ownerships = new HashMap<Token, Float>();
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 20b45ef..ccb66fd 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class Token implements RingPosition<Token>, Serializable
 {
@@ -40,8 +39,30 @@ public abstract class Token implements RingPosition<Token>, Serializable
         public abstract Token fromByteArray(ByteBuffer bytes);
         public abstract String toString(Token token); // serialize as string, not necessarily human-readable
         public abstract Token fromString(String string); // deserialize
-
         public abstract void validate(String token) throws ConfigurationException;
+
+        public void serialize(Token token, DataOutputPlus out) throws IOException
+        {
+            out.write(toByteArray(token));
+        }
+
+        public void serialize(Token token, ByteBuffer out) throws IOException
+        {
+            out.put(toByteArray(token));
+        }
+
+        public Token fromByteBuffer(ByteBuffer bytes, int position, int length)
+        {
+            bytes = bytes.duplicate();
+            bytes.position(position)
+                 .limit(position + length);
+            return fromByteArray(bytes);
+        }
+
+        public int byteSize(Token token)
+        {
+            return toByteArray(token).remaining();
+        }
     }
 
     public static class TokenSerializer implements IPartitionerDependentSerializer<Token>
@@ -49,23 +70,28 @@ public abstract class Token implements RingPosition<Token>, Serializable
         public void serialize(Token token, DataOutputPlus out, int version) throws IOException
         {
             IPartitioner p = token.getPartitioner();
-            ByteBuffer b = p.getTokenFactory().toByteArray(token);
-            ByteBufferUtil.writeWithLength(b, out);
+            out.writeInt(p.getTokenFactory().byteSize(token));
+            p.getTokenFactory().serialize(token, out);
         }
 
         public Token deserialize(DataInput in, IPartitioner p, int version) throws IOException
         {
-            int size = in.readInt();
+            int size = deserializeSize(in);
             byte[] bytes = new byte[size];
             in.readFully(bytes);
             return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes));
         }
 
+        public int deserializeSize(DataInput in) throws IOException
+        {
+            return in.readInt();
+        }
+
         public long serializedSize(Token object, int version)
         {
             IPartitioner p = object.getPartitioner();
-            ByteBuffer b = p.getTokenFactory().toByteArray(object);
-            return TypeSizes.sizeof(b.remaining()) + b.remaining();
+            int byteSize = p.getTokenFactory().byteSize(object);
+            return TypeSizes.sizeof(byteSize) + byteSize;
         }
     }
 
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index a67aac0..f682bfb 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -236,7 +236,10 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                 }
                 syncTasks.add(task);
             }
+            trees.get(i).trees.release();
         }
+        trees.get(trees.size() - 1).trees.release();
+
         return syncTasks;
     }
 
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index c8becaa..9a89fa6 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.repair;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -29,7 +30,6 @@ import com.google.common.hash.HashCode;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +46,7 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
@@ -149,7 +150,7 @@ public class Validator implements Runnable
             }
         }
         logger.debug("Prepared AEService trees of size {} for {}", trees.size(), desc);
-        ranges = tree.invalids();
+        ranges = tree.rangeIterator();
     }
 
     /**
@@ -172,7 +173,7 @@ public class Validator implements Runnable
         if (!findCorrectRange(lastKey.getToken()))
         {
             // add the empty hash, and move to the next range
-            ranges = trees.invalids();
+            ranges = trees.rangeIterator();
             findCorrectRange(lastKey.getToken());
         }
 
@@ -368,9 +369,7 @@ public class Validator implements Runnable
      */
     public void complete()
     {
-        completeTree();
-
-        StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
+        assert ranges != null : "Validator was not prepared()";
 
         if (logger.isDebugEnabled())
         {
@@ -380,20 +379,8 @@ public class Validator implements Runnable
             logger.debug("Validated {} partitions for {}.  Partition sizes are:", validated, desc.sessionId);
             trees.logRowSizePerLeaf(logger);
         }
-    }
-
-    @VisibleForTesting
-    public void completeTree()
-    {
-        assert ranges != null : "Validator was not prepared()";
-
-        ranges = trees.invalids();
 
-        while (ranges.hasNext())
-        {
-            range = ranges.next();
-            range.ensureHashInitialised();
-        }
+        StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
     }
 
     /**
@@ -404,8 +391,7 @@ public class Validator implements Runnable
     public void fail()
     {
         logger.error("Failed creating a merkle tree for {}, {} (see log for details)", desc, initiator);
-        // send fail message only to nodes >= version 2.0
-        MessagingService.instance().send(Message.out(REPAIR_REQ, new ValidationComplete(desc)), initiator);
+        respond(new ValidationComplete(desc));
     }
 
     /**
@@ -413,12 +399,51 @@ public class Validator implements Runnable
      */
     public void run()
     {
-        // respond to the request that triggered this validation
-        if (!initiator.equals(FBUtilities.getBroadcastAddressAndPort()))
+        if (initiatorIsRemote())
         {
             logger.info("{} Sending completed merkle tree to {} for {}.{}", previewKind.logPrefix(desc.sessionId), initiator, desc.keyspace, desc.columnFamily);
             Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
         }
-        MessagingService.instance().send(Message.out(REPAIR_REQ, new ValidationComplete(desc, trees)), initiator);
+        else
+        {
+            logger.info("{} Local completed merkle tree for {} for {}.{}", previewKind.logPrefix(desc.sessionId), initiator, desc.keyspace, desc.columnFamily);
+            Tracing.traceRepair("Local completed merkle tree for {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
+
+        }
+        respond(new ValidationComplete(desc, trees));
+    }
+
+    private boolean initiatorIsRemote()
+    {
+        return !FBUtilities.getBroadcastAddressAndPort().equals(initiator);
+    }
+
+    private void respond(ValidationComplete response)
+    {
+        if (initiatorIsRemote())
+        {
+            MessagingService.instance().send(Message.out(REPAIR_REQ, response), initiator);
+            return;
+        }
+
+        /*
+         * For local initiators, DO NOT send the message to self over loopback. This is a wasted ser/de loop
+         * and a ton of garbage. Instead, move the trees off heap and invoke message handler. We could do it
+         * directly, since this method will only be called from {@code Stage.ENTI_ENTROPY}, but we do instead
+         * execute a {@code Runnable} on the stage - in case that assumption ever changes by accident.
+         */
+        StageManager.getStage(Stage.ANTI_ENTROPY).execute(() ->
+        {
+            ValidationComplete movedResponse = response;
+            try
+            {
+                movedResponse = response.tryMoveOffHeap();
+            }
+            catch (IOException e)
+            {
+                logger.error("Failed to move local merkle tree for {} off heap", desc, e);
+            }
+            ActiveRepairService.instance.handleMessage(initiator, movedResponse);
+        });
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
index 704bffb..b8aa736 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
@@ -56,6 +56,15 @@ public class ValidationComplete extends RepairMessage
         return trees != null;
     }
 
+    /**
+     * @return a new {@link ValidationComplete} instance with all trees moved off heap, or {@code this}
+     * if it's a failure response.
+     */
+    public ValidationComplete tryMoveOffHeap() throws IOException
+    {
+        return trees == null ? this : new ValidationComplete(desc, trees.tryMoveOffHeap());
+    }
+
     @Override
     public boolean equals(Object o)
     {
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index abfd6d9..689771c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -248,6 +248,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return session;
     }
 
+    public boolean getOffheapMerkleTreesEnabled()
+    {
+        return DatabaseDescriptor.getOffheapMerkleTreesEnabled();
+    }
+
+    public void setOffheapMerkleTreesEnabled(boolean enabled)
+    {
+        DatabaseDescriptor.setOffheapMerkleTreesEnabled(enabled);
+    }
+
     private <T extends AbstractFuture &
                IEndpointStateChangeSubscriber &
                IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 788300c..518436e 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -101,6 +101,16 @@ public class ByteBufferUtil
         return FastByteOperations.compareUnsigned(o1, o2, 0, o2.length);
     }
 
+    public static int compare(ByteBuffer o1, int s1, int l1, byte[] o2)
+    {
+        return FastByteOperations.compareUnsigned(o1, s1, l1, o2, 0, o2.length);
+    }
+
+    public static int compare(byte[] o1, ByteBuffer o2, int s2, int l2)
+    {
+        return FastByteOperations.compareUnsigned(o1, 0, o1.length, o2, s2, l2);
+    }
+
     /**
      * Decode a String representation.
      * This method assumes that the encoding charset is UTF_8.
@@ -161,16 +171,25 @@ public class ByteBufferUtil
      */
     public static byte[] getArray(ByteBuffer buffer)
     {
-        int length = buffer.remaining();
+        return getArray(buffer, buffer.position(), buffer.remaining());
+    }
+
+    /**
+     * You should almost never use this.  Instead, use the write* methods to avoid copies.
+     */
+    public static byte[] getArray(ByteBuffer buffer, int position, int length)
+    {
         if (buffer.hasArray())
         {
-            int boff = buffer.arrayOffset() + buffer.position();
+            int boff = buffer.arrayOffset() + position;
             return Arrays.copyOfRange(buffer.array(), boff, boff + length);
         }
+
         // else, DirectByteBuffer.get() is the fastest route
         byte[] bytes = new byte[length];
-        buffer.duplicate().get(bytes);
-
+        ByteBuffer dup = buffer.duplicate();
+        dup.position(position).limit(position + length);
+        dup.get(bytes);
         return bytes;
     }
 
@@ -631,6 +650,7 @@ public class ByteBufferUtil
 
         assert bytes1.limit() >= offset1 + length : "The first byte array isn't long enough for the specified offset and length.";
         assert bytes2.limit() >= offset2 + length : "The second byte array isn't long enough for the specified offset and length.";
+
         for (int i = 0; i < length; i++)
         {
             byte byte1 = bytes1.get(offset1 + i);
@@ -669,7 +689,7 @@ public class ByteBufferUtil
         return buf.capacity() > buf.remaining() || !buf.hasArray() ? ByteBuffer.wrap(getArray(buf)) : buf;
     }
 
-    // Doesn't change bb position
+    // doesn't change bb position
     public static int getShortLength(ByteBuffer bb, int position)
     {
         int length = (bb.get(position) & 0xFF) << 8;
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index c37dcca..0f7b2ca 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -289,6 +289,22 @@ public class FBUtilities
         return out;
     }
 
+    /**
+     * Bitwise XOR of the inputs, in place on the left array
+     *
+     * Assumes inputs are same length
+     */
+    static void xorOntoLeft(byte[] left, byte[] right)
+    {
+        if (left == null || right == null)
+            return;
+
+        assert left.length == right.length;
+
+        for (int i = 0; i < left.length; i++)
+            left[i] = (byte) ((left[i] & 0xFF) ^ (right[i] & 0xFF));
+    }
+
     public static void sortSampledKeys(List<DecoratedKey> keys, Range<Token> range)
     {
         if (range.left.compareTo(range.right) >= 0)
diff --git a/src/java/org/apache/cassandra/utils/FastByteOperations.java b/src/java/org/apache/cassandra/utils/FastByteOperations.java
index 6581736..060dee5 100644
--- a/src/java/org/apache/cassandra/utils/FastByteOperations.java
+++ b/src/java/org/apache/cassandra/utils/FastByteOperations.java
@@ -55,6 +55,16 @@ public class FastByteOperations
         return -BestHolder.BEST.compare(b2, b1, s1, l1);
     }
 
+    public static int compareUnsigned(ByteBuffer b1, int s1, int l1, byte[] b2, int s2, int l2)
+    {
+        return BestHolder.BEST.compare(b1, s1, l1, b2, s2, l2);
+    }
+
+    public static int compareUnsigned(byte[] b1, int s1, int l1, ByteBuffer b2, int s2, int l2)
+    {
+        return -BestHolder.BEST.compare(b2, s2, l2, b1, s1, l1);
+    }
+
     public static int compareUnsigned(ByteBuffer b1, ByteBuffer b2)
     {
         return BestHolder.BEST.compare(b1, b2);
@@ -77,6 +87,8 @@ public class FastByteOperations
 
         abstract public int compare(ByteBuffer buffer1, byte[] buffer2, int offset2, int length2);
 
+        abstract public int compare(ByteBuffer buffer1, int offset1, int length1, byte[] buffer2, int offset2, int length2);
+
         abstract public int compare(ByteBuffer buffer1, ByteBuffer buffer2);
 
         abstract public void copy(ByteBuffer src, int srcPosition, byte[] trg, int trgPosition, int length);
@@ -187,25 +199,24 @@ public class FastByteOperations
 
         public int compare(ByteBuffer buffer1, byte[] buffer2, int offset2, int length2)
         {
+            return compare(buffer1, buffer1.position(), buffer1.remaining(), buffer2, offset2, length2);
+        }
+
+        public int compare(ByteBuffer buffer1, int position1, int length1, byte[] buffer2, int offset2, int length2)
+        {
             Object obj1;
             long offset1;
             if (buffer1.hasArray())
             {
                 obj1 = buffer1.array();
-                offset1 = BYTE_ARRAY_BASE_OFFSET + buffer1.arrayOffset();
+                offset1 = BYTE_ARRAY_BASE_OFFSET + buffer1.arrayOffset() + position1;
             }
             else
             {
                 obj1 = null;
-                offset1 = theUnsafe.getLong(buffer1, DIRECT_BUFFER_ADDRESS_OFFSET);
-            }
-            int length1;
-            {
-                int position = buffer1.position();
-                int limit = buffer1.limit();
-                length1 = limit - position;
-                offset1 += position;
+                offset1 = theUnsafe.getLong(buffer1, DIRECT_BUFFER_ADDRESS_OFFSET) + position1;
             }
+
             return compareTo(obj1, offset1, length1, buffer2, BYTE_ARRAY_BASE_OFFSET + offset2, length2);
         }
 
@@ -397,11 +408,28 @@ public class FastByteOperations
             return length1 - length2;
         }
 
+        public int compare(ByteBuffer buffer1, int position1, int length1, byte[] buffer2, int offset2, int length2)
+        {
+            if (buffer1.hasArray())
+                return compare(buffer1.array(), buffer1.arrayOffset() + position1, length1, buffer2, offset2, length2);
+
+            if (position1 != buffer1.position())
+            {
+                buffer1 = buffer1.duplicate();
+                buffer1.position(position1);
+            }
+
+            return compare(buffer1, ByteBuffer.wrap(buffer2, offset2, length2));
+        }
+
         public int compare(ByteBuffer buffer1, byte[] buffer2, int offset2, int length2)
         {
             if (buffer1.hasArray())
+            {
                 return compare(buffer1.array(), buffer1.arrayOffset() + buffer1.position(), buffer1.remaining(),
                                buffer2, offset2, length2);
+            }
+
             return compare(buffer1, ByteBuffer.wrap(buffer2, offset2, length2));
         }
 
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index d131ff5..9d9eadb 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -19,25 +19,33 @@ package org.apache.cassandra.utils;
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.PeekingIterator;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.IPartitionerDependentSerializer;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.utils.ByteBufferUtil.compare;
 
 /**
  * A MerkleTree implemented as a binary tree.
@@ -59,84 +67,51 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  * If two MerkleTrees have the same hashdepth, they represent a perfect tree
  * of the same depth, and can always be compared, regardless of size or splits.
  */
-public class MerkleTree implements Serializable
+public class MerkleTree
 {
-    private static Logger logger = LoggerFactory.getLogger(MerkleTree.class);
+    private static final Logger logger = LoggerFactory.getLogger(MerkleTree.class);
+
+    private static final int HASH_SIZE = 32; // SHA-256 = 32 bytes.
+    private static final byte[] EMPTY_HASH = new byte[HASH_SIZE];
+
+    /*
+     * Thread-local byte array, large enough to host SHA256 bytes or MM3/Random partitoners' tokens
+     */
+    private static final ThreadLocal<byte[]> byteArray = ThreadLocal.withInitial(() -> new byte[HASH_SIZE]);
 
-    public static final MerkleTreeSerializer serializer = new MerkleTreeSerializer();
-    private static final long serialVersionUID = 2L;
+    private static byte[] getTempArray(int minimumSize)
+    {
+        return minimumSize <= HASH_SIZE ? byteArray.get() : new byte[minimumSize];
+    }
 
     public static final byte RECOMMENDED_DEPTH = Byte.MAX_VALUE - 1;
 
-    public static final int CONSISTENT = 0;
-    public static final int FULLY_INCONSISTENT = 1;
-    public static final int PARTIALLY_INCONSISTENT = 2;
-    private static final byte[] EMPTY_HASH = new byte[0];
+    @SuppressWarnings("WeakerAccess")
+    static final int CONSISTENT = 0;
+    static final int FULLY_INCONSISTENT = 1;
+    @SuppressWarnings("WeakerAccess")
+    static final int PARTIALLY_INCONSISTENT = 2;
 
-    public final byte hashdepth;
+    private final int hashdepth;
 
     /** The top level range that this MerkleTree covers. */
-    public final Range<Token> fullRange;
+    final Range<Token> fullRange;
     private final IPartitioner partitioner;
 
     private long maxsize;
     private long size;
-    private Hashable root;
+    private Node root;
 
-    public static class MerkleTreeSerializer implements IVersionedSerializer<MerkleTree>
+    /**
+     * @param partitioner The partitioner in use.
+     * @param range the range this tree covers
+     * @param hashdepth The maximum depth of the tree. 100/(2^depth) is the %
+     *        of the key space covered by each subrange of a fully populated tree.
+     * @param maxsize The maximum number of subranges in the tree.
+     */
+    public MerkleTree(IPartitioner partitioner, Range<Token> range, int hashdepth, long maxsize)
     {
-        public void serialize(MerkleTree mt, DataOutputPlus out, int version) throws IOException
-        {
-            out.writeByte(mt.hashdepth);
-            out.writeLong(mt.maxsize);
-            out.writeLong(mt.size);
-            out.writeUTF(mt.partitioner.getClass().getCanonicalName());
-            // full range
-            Token.serializer.serialize(mt.fullRange.left, out, version);
-            Token.serializer.serialize(mt.fullRange.right, out, version);
-            Hashable.serializer.serialize(mt.root, out, version);
-        }
-
-        public MerkleTree deserialize(DataInputPlus in, int version) throws IOException
-        {
-            byte hashdepth = in.readByte();
-            long maxsize = in.readLong();
-            long size = in.readLong();
-            IPartitioner partitioner;
-            try
-            {
-                partitioner = FBUtilities.newPartitioner(in.readUTF());
-            }
-            catch (ConfigurationException e)
-            {
-                throw new IOException(e);
-            }
-
-            // full range
-            Token left = Token.serializer.deserialize(in, partitioner, version);
-            Token right = Token.serializer.deserialize(in, partitioner, version);
-            Range<Token> fullRange = new Range<>(left, right);
-
-            MerkleTree mt = new MerkleTree(partitioner, fullRange, hashdepth, maxsize);
-            mt.size = size;
-            mt.root = Hashable.serializer.deserialize(in, partitioner, version);
-            return mt;
-        }
-
-        public long serializedSize(MerkleTree mt, int version)
-        {
-            long size = 1 // mt.hashdepth
-                 + TypeSizes.sizeof(mt.maxsize)
-                 + TypeSizes.sizeof(mt.size)
-                 + TypeSizes.sizeof(mt.partitioner.getClass().getCanonicalName());
-
-            // full range
-            size += Token.serializer.serializedSize(mt.fullRange.left, version);
-            size += Token.serializer.serializedSize(mt.fullRange.right, version);
-
-            size += Hashable.serializer.serializedSize(mt.root, version);
-            return size;
-        }
+        this(new OnHeapLeaf(), partitioner, range, hashdepth, maxsize, 1);
     }
 
     /**
@@ -145,60 +120,56 @@ public class MerkleTree implements Serializable
      * @param hashdepth The maximum depth of the tree. 100/(2^depth) is the %
      *        of the key space covered by each subrange of a fully populated tree.
      * @param maxsize The maximum number of subranges in the tree.
+     * @param size The size of the tree. Typically 1, unless deserilized from an existing tree
      */
-    public MerkleTree(IPartitioner partitioner, Range<Token> range, byte hashdepth, long maxsize)
+    private MerkleTree(Node root, IPartitioner partitioner, Range<Token> range, int hashdepth, long maxsize, long size)
     {
         assert hashdepth < Byte.MAX_VALUE;
+
+        this.root = root;
         this.fullRange = Preconditions.checkNotNull(range);
         this.partitioner = Preconditions.checkNotNull(partitioner);
         this.hashdepth = hashdepth;
         this.maxsize = maxsize;
-
-        size = 1;
-        root = new Leaf(null);
-    }
-
-
-    static byte inc(byte in)
-    {
-        assert in < Byte.MAX_VALUE;
-        return (byte)(in + 1);
+        this.size = size;
     }
 
     /**
      * Initializes this tree by splitting it until hashdepth is reached,
      * or until an additional level of splits would violate maxsize.
      *
-     * NB: Replaces all nodes in the tree.
+     * NB: Replaces all nodes in the tree, and always builds on the heap
      */
     public void init()
     {
         // determine the depth to which we can safely split the tree
-        byte sizedepth = (byte)(Math.log10(maxsize) / Math.log10(2));
-        byte depth = (byte)Math.min(sizedepth, hashdepth);
+        int sizedepth = (int) (Math.log10(maxsize) / Math.log10(2));
+        int depth = Math.min(sizedepth, hashdepth);
+
+        root = initHelper(fullRange.left, fullRange.right, 0, depth);
+        size = (long) Math.pow(2, depth);
+    }
 
-        root = initHelper(fullRange.left, fullRange.right, (byte)0, depth);
-        size = (long)Math.pow(2, depth);
+    public void release()
+    {
+        if (root instanceof OffHeapNode)
+            ((OffHeapNode) root).release();
+        root = null;
     }
 
-    private Hashable initHelper(Token left, Token right, byte depth, byte max)
+    private OnHeapNode initHelper(Token left, Token right, int depth, int max)
     {
         if (depth == max)
             // we've reached the leaves
-            return new Leaf();
+            return new OnHeapLeaf();
         Token midpoint = partitioner.midpoint(left, right);
 
         if (midpoint.equals(left) || midpoint.equals(right))
-            return new Leaf();
-
-        Hashable lchild =  initHelper(left, midpoint, inc(depth), max);
-        Hashable rchild =  initHelper(midpoint, right, inc(depth), max);
-        return new Inner(midpoint, lchild, rchild);
-    }
+            return new OnHeapLeaf();
 
-    Hashable root()
-    {
-        return root;
+        OnHeapNode leftChild = initHelper(left, midpoint, depth + 1, max);
+        OnHeapNode rightChild = initHelper(midpoint, right, depth + 1, max);
+        return new OnHeapInner(midpoint, leftChild, rightChild);
     }
 
     public IPartitioner partitioner()
@@ -233,20 +204,17 @@ public class MerkleTree implements Serializable
     public static List<TreeRange> difference(MerkleTree ltree, MerkleTree rtree)
     {
         if (!ltree.fullRange.equals(rtree.fullRange))
-            throw new IllegalArgumentException("Difference only make sense on tree covering the same range (but " + ltree.fullRange + " != " + rtree.fullRange + ")");
+            throw new IllegalArgumentException("Difference only make sense on tree covering the same range (but " + ltree.fullRange + " != " + rtree.fullRange + ')');
 
         List<TreeRange> diff = new ArrayList<>();
-        TreeDifference active = new TreeDifference(ltree.fullRange.left, ltree.fullRange.right, (byte)0);
+        TreeRange active = new TreeRange(ltree.fullRange.left, ltree.fullRange.right, 0);
 
-        Hashable lnode = ltree.find(active);
-        Hashable rnode = rtree.find(active);
-        byte[] lhash = lnode.hash();
-        byte[] rhash = rnode.hash();
-        active.setSize(lnode.sizeOfRange(), rnode.sizeOfRange());
+        Node lnode = ltree.find(active);
+        Node rnode = rtree.find(active);
 
-        if (lhash != null && rhash != null && !Arrays.equals(lhash, rhash))
+        if (lnode.hashesDiffer(rnode))
         {
-            if(lnode instanceof  Leaf || rnode instanceof Leaf)
+            if (lnode instanceof Leaf || rnode instanceof Leaf)
             {
                 logger.debug("Digest mismatch detected among leaf nodes {}, {}", lnode, rnode);
                 diff.add(active);
@@ -261,14 +229,12 @@ public class MerkleTree implements Serializable
                 }
             }
         }
-        else if (lhash == null || rhash == null)
-            diff.add(active);
+
         return diff;
     }
 
     /**
-     * TODO: This function could be optimized into a depth first traversal of
-     * the two trees in parallel.
+     * TODO: This function could be optimized into a depth first traversal of the two trees in parallel.
      *
      * Takes two trees and a range for which they have hashes, but are inconsistent.
      * @return FULLY_INCONSISTENT if active is inconsistent, PARTIALLY_INCONSISTENT if only a subrange is inconsistent.
@@ -289,54 +255,39 @@ public class MerkleTree implements Serializable
             return FULLY_INCONSISTENT;
         }
 
-        TreeDifference left = new TreeDifference(active.left, midpoint, inc(active.depth));
-        TreeDifference right = new TreeDifference(midpoint, active.right, inc(active.depth));
+        TreeRange left = new TreeRange(active.left, midpoint, active.depth + 1);
+        TreeRange right = new TreeRange(midpoint, active.right, active.depth + 1);
         logger.debug("({}) Hashing sub-ranges [{}, {}] for {} divided by midpoint {}", active.depth, left, right, active, midpoint);
-        byte[] lhash, rhash;
-        Hashable lnode, rnode;
+        Node lnode, rnode;
 
         // see if we should recurse left
         lnode = ltree.find(left);
         rnode = rtree.find(left);
-        lhash = lnode.hash();
-        rhash = rnode.hash();
-        left.setSize(lnode.sizeOfRange(), rnode.sizeOfRange());
-        left.setRows(lnode.rowsInRange(), rnode.rowsInRange());
 
         int ldiff = CONSISTENT;
-        boolean lreso = lhash != null && rhash != null;
-        if (lreso && !Arrays.equals(lhash, rhash))
+        if (lnode.hashesDiffer(rnode))
         {
             logger.debug("({}) Inconsistent digest on left sub-range {}: [{}, {}]", active.depth, left, lnode, rnode);
-            if (lnode instanceof Leaf) ldiff = FULLY_INCONSISTENT;
-            else ldiff = differenceHelper(ltree, rtree, diff, left);
-        }
-        else if (!lreso)
-        {
-            logger.debug("({}) Left sub-range fully inconsistent {}", active.depth, left);
-            ldiff = FULLY_INCONSISTENT;
+
+            if (lnode instanceof Leaf)
+                ldiff = FULLY_INCONSISTENT;
+            else
+                ldiff = differenceHelper(ltree, rtree, diff, left);
         }
 
         // see if we should recurse right
         lnode = ltree.find(right);
         rnode = rtree.find(right);
-        lhash = lnode.hash();
-        rhash = rnode.hash();
-        right.setSize(lnode.sizeOfRange(), rnode.sizeOfRange());
-        right.setRows(lnode.rowsInRange(), rnode.rowsInRange());
 
         int rdiff = CONSISTENT;
-        boolean rreso = lhash != null && rhash != null;
-        if (rreso && !Arrays.equals(lhash, rhash))
+        if (lnode.hashesDiffer(rnode))
         {
             logger.debug("({}) Inconsistent digest on right sub-range {}: [{}, {}]", active.depth, right, lnode, rnode);
-            if (rnode instanceof Leaf) rdiff = FULLY_INCONSISTENT;
-            else rdiff = differenceHelper(ltree, rtree, diff, right);
-        }
-        else if (!rreso)
-        {
-            logger.debug("({}) Right sub-range fully inconsistent {}", active.depth, right);
-            rdiff = FULLY_INCONSISTENT;
+
+            if (rnode instanceof Leaf)
+                rdiff = FULLY_INCONSISTENT;
+            else
+                rdiff = differenceHelper(ltree, rtree, diff, right);
         }
 
         if (ldiff == FULLY_INCONSISTENT && rdiff == FULLY_INCONSISTENT)
@@ -367,32 +318,36 @@ public class MerkleTree implements Serializable
      */
     public TreeRange get(Token t)
     {
-        return getHelper(root, fullRange.left, fullRange.right, (byte)0, t);
+        return getHelper(root, fullRange.left, fullRange.right, t);
     }
 
-    TreeRange getHelper(Hashable hashable, Token pleft, Token pright, byte depth, Token t)
+    private TreeRange getHelper(Node node, Token pleft, Token pright, Token t)
     {
+        int depth = 0;
+
         while (true)
         {
-            if (hashable instanceof Leaf)
+            if (node instanceof Leaf)
             {
                 // we've reached a hash: wrap it up and deliver it
-                return new TreeRange(this, pleft, pright, depth, hashable);
+                return new TreeRange(this, pleft, pright, depth, node);
             }
-            // else: node.
-
-            Inner node = (Inner) hashable;
-            depth = inc(depth);
-            if (Range.contains(pleft, node.token, t))
-            { // left child contains token
-                hashable = node.lchild;
-                pright = node.token;
+
+            assert node instanceof Inner;
+            Inner inner = (Inner) node;
+
+            if (Range.contains(pleft, inner.token(), t)) // left child contains token
+            {
+                pright = inner.token();
+                node = inner.left();
             }
-            else
-            { // else: right child contains token
-                hashable = node.rchild;
-                pleft = node.token;
+            else // right child contains token
+            {
+                pleft = inner.token();
+                node = inner.right();
             }
+
+            depth++;
         }
     }
 
@@ -405,20 +360,21 @@ public class MerkleTree implements Serializable
         invalidateHelper(root, fullRange.left, t);
     }
 
-    private void invalidateHelper(Hashable hashable, Token pleft, Token t)
+    private void invalidateHelper(Node node, Token pleft, Token t)
     {
-        hashable.hash(null);
-        if (hashable instanceof Leaf)
+        node.hash(EMPTY_HASH);
+
+        if (node instanceof Leaf)
             return;
-        // else: node.
 
-        Inner node = (Inner)hashable;
-        if (Range.contains(pleft, node.token, t))
-            // left child contains token
-            invalidateHelper(node.lchild, pleft, t);
+        assert node instanceof Inner;
+        Inner inner = (Inner) node;
+        // TODO: reset computed flag on OnHeapInners
+
+        if (Range.contains(pleft, inner.token(), t))
+            invalidateHelper(inner.left(), pleft, t); // left child contains token
         else
-            // right child contains token
-            invalidateHelper(node.rchild, node.token, t);
+            invalidateHelper(inner.right(), inner.token(), t); // right child contains token
     }
 
     /**
@@ -428,67 +384,106 @@ public class MerkleTree implements Serializable
      * NB: Currently does not support wrapping ranges that do not end with
      * partitioner.getMinimumToken().
      *
-     * @return Null if any subrange of the range is invalid, or if the exact
+     * @return {@link #EMPTY_HASH} if any subrange of the range is invalid, or if the exact
      *         range cannot be calculated using this tree.
      */
+    @VisibleForTesting
     public byte[] hash(Range<Token> range)
     {
         return find(range).hash();
     }
 
     /**
-     * Find the {@link Hashable} node that matches the given {@code range}.
+     * Exceptions that stop recursion early when we are sure that no answer
+     * can be found.
+     */
+    static abstract class StopRecursion extends Exception
+    {
+        static class  TooDeep extends StopRecursion {}
+        static class BadRange extends StopRecursion {}
+    }
+
+    /**
+     * Find the {@link Node} node that matches the given {@code range}.
      *
      * @param range Range to find
-     * @return {@link Hashable} found. If nothing found, return {@link Leaf} with null hash.
+     * @return {@link Node} found. If nothing found, return {@link Leaf} with empty hash.
      */
-    private Hashable find(Range<Token> range)
+    private Node find(Range<Token> range)
+    {
+        try
+        {
+            return findHelper(root, new Range<>(fullRange.left, fullRange.right), range);
+        }
+        catch (StopRecursion e)
+        {
+            return new OnHeapLeaf();
+        }
+    }
+
+    interface Consumer<E extends Exception>
+    {
+        void accept(Node node) throws E;
+    }
+
+    @VisibleForTesting
+    <E extends Exception> boolean ifHashesRange(Range<Token> range, Consumer<E> consumer) throws E
     {
         try
         {
-            return findHelper(root, new Range<Token>(fullRange.left, fullRange.right), range);
+            Node node = findHelper(root, new Range<>(fullRange.left, fullRange.right), range);
+            boolean hasHash = !node.hasEmptyHash();
+            if (hasHash)
+                consumer.accept(node);
+            return hasHash;
         }
         catch (StopRecursion e)
         {
-            return new Leaf();
+            return false;
         }
     }
 
+    @VisibleForTesting
+    boolean hashesRange(Range<Token> range)
+    {
+        return ifHashesRange(range, n -> {});
+    }
+
     /**
      * @throws StopRecursion If no match could be found for the range.
      */
-    private Hashable findHelper(Hashable current, Range<Token> activeRange, Range<Token> find) throws StopRecursion
+    private Node findHelper(Node current, Range<Token> activeRange, Range<Token> find) throws StopRecursion
     {
         while (true)
         {
             if (current instanceof Leaf)
             {
                 if (!find.contains(activeRange))
-                    // we are not fully contained in this range!
-                    throw new StopRecursion.BadRange();
+                    throw new StopRecursion.BadRange(); // we are not fully contained in this range!
+
                 return current;
             }
-            // else: node.
 
-            Inner node = (Inner) current;
-            Range<Token> leftRange = new Range<>(activeRange.left, node.token);
-            Range<Token> rightRange = new Range<>(node.token, activeRange.right);
+            assert current instanceof Inner;
+            Inner inner = (Inner) current;
 
-            if (find.contains(activeRange))
-                // this node is fully contained in the range
-                return node.calc();
+            Range<Token> leftRange  = new Range<>(activeRange.left, inner.token());
+            Range<Token> rightRange = new Range<>(inner.token(), activeRange.right);
+
+            if (find.contains(activeRange)) // this node is fully contained in the range
+                return inner.compute();
 
             // else: one of our children contains the range
 
-            if (leftRange.contains(find))
-            { // left child contains/matches the range
-                current = node.lchild;
+            if (leftRange.contains(find)) // left child contains/matches the range
+            {
                 activeRange = leftRange;
+                current = inner.left();
             }
-            else if (rightRange.contains(find))
-            { // right child contains/matches the range
-                current = node.rchild;
+            else if (rightRange.contains(find)) // right child contains/matches the range
+            {
                 activeRange = rightRange;
+                current = inner.right();
             }
             else
             {
@@ -506,12 +501,12 @@ public class MerkleTree implements Serializable
      */
     public boolean split(Token t)
     {
-        if (!(size < maxsize))
+        if (size >= maxsize)
             return false;
 
         try
         {
-            root = splitHelper(root, fullRange.left, fullRange.right, (byte)0, t);
+            root = splitHelper(root, fullRange.left, fullRange.right, 0, t);
         }
         catch (StopRecursion.TooDeep e)
         {
@@ -520,12 +515,12 @@ public class MerkleTree implements Serializable
         return true;
     }
 
-    private Hashable splitHelper(Hashable hashable, Token pleft, Token pright, byte depth, Token t) throws StopRecursion.TooDeep
+    private OnHeapNode splitHelper(Node node, Token pleft, Token pright, int depth, Token t) throws StopRecursion.TooDeep
     {
         if (depth >= hashdepth)
             throw new StopRecursion.TooDeep();
 
-        if (hashable instanceof Leaf)
+        if (node instanceof Leaf)
         {
             Token midpoint = partitioner.midpoint(pleft, pright);
 
@@ -536,47 +531,47 @@ public class MerkleTree implements Serializable
 
             // split
             size++;
-            return new Inner(midpoint, new Leaf(), new Leaf());
+            return new OnHeapInner(midpoint, new OnHeapLeaf(), new OnHeapLeaf());
         }
         // else: node.
 
         // recurse on the matching child
-        Inner node = (Inner)hashable;
+        assert node instanceof OnHeapInner;
+        OnHeapInner inner = (OnHeapInner) node;
 
-        if (Range.contains(pleft, node.token, t))
-            // left child contains token
-            node.lchild(splitHelper(node.lchild, pleft, node.token, inc(depth), t));
-        else
-            // else: right child contains token
-            node.rchild(splitHelper(node.rchild, node.token, pright, inc(depth), t));
-        return node;
+        if (Range.contains(pleft, inner.token(), t)) // left child contains token
+            inner.left(splitHelper(inner.left(), pleft, inner.token(), depth + 1, t));
+        else // else: right child contains token
+            inner.right(splitHelper(inner.right(), inner.token(), pright, depth + 1, t));
+
+        return inner;
     }
 
     /**
      * Returns a lazy iterator of invalid TreeRanges that need to be filled
      * in order to make the given Range valid.
      */
-    public TreeRangeIterator invalids()
+    TreeRangeIterator rangeIterator()
     {
         return new TreeRangeIterator(this);
     }
 
-    public EstimatedHistogram histogramOfRowSizePerLeaf()
+    EstimatedHistogram histogramOfRowSizePerLeaf()
     {
         HistogramBuilder histbuild = new HistogramBuilder();
         for (TreeRange range : new TreeRangeIterator(this))
         {
-            histbuild.add(range.hashable.sizeOfRange);
+            histbuild.add(range.node.sizeOfRange());
         }
         return histbuild.buildWithStdevRangesAroundMean();
     }
 
-    public EstimatedHistogram histogramOfRowCountPerLeaf()
+    EstimatedHistogram histogramOfRowCountPerLeaf()
     {
         HistogramBuilder histbuild = new HistogramBuilder();
         for (TreeRange range : new TreeRangeIterator(this))
         {
-            histbuild.add(range.hashable.rowsInRange);
+            histbuild.add(range.node.partitionsInRange());
         }
         return histbuild.buildWithStdevRangesAroundMean();
     }
@@ -586,7 +581,7 @@ public class MerkleTree implements Serializable
         long count = 0;
         for (TreeRange range : new TreeRangeIterator(this))
         {
-            count += range.hashable.rowsInRange;
+            count += range.node.partitionsInRange();
         }
         return count;
     }
@@ -597,61 +592,23 @@ public class MerkleTree implements Serializable
         StringBuilder buff = new StringBuilder();
         buff.append("#<MerkleTree root=");
         root.toString(buff, 8);
-        buff.append(">");
+        buff.append('>');
         return buff.toString();
     }
 
-    public static class TreeDifference extends TreeRange
+    @Override
+    public boolean equals(Object other)
     {
-        private static final long serialVersionUID = 6363654174549968183L;
-
-        private long sizeOnLeft;
-        private long sizeOnRight;
-        private long rowsOnLeft;
-        private long rowsOnRight;
-
-        void setSize(long sizeOnLeft, long sizeOnRight)
-        {
-            this.sizeOnLeft = sizeOnLeft;
-            this.sizeOnRight = sizeOnRight;
-        }
-
-        void setRows(long rowsOnLeft, long rowsOnRight)
-        {
-            this.rowsOnLeft = rowsOnLeft;
-            this.rowsOnRight = rowsOnRight;
-        }
-
-        public long sizeOnLeft()
-        {
-            return sizeOnLeft;
-        }
-
-        public long sizeOnRight()
-        {
-            return sizeOnRight;
-        }
-
-        public long rowsOnLeft()
-        {
-            return rowsOnLeft;
-        }
-
-        public long rowsOnRight()
-        {
-            return rowsOnRight;
-        }
-
-        public TreeDifference(Token left, Token right, byte depth)
-        {
-            super(null, left, right, depth, null);
-        }
-
-        public long totalRows()
-        {
-            return rowsOnLeft + rowsOnRight;
-        }
-
+        if (!(other instanceof MerkleTree))
+            return false;
+        MerkleTree that = (MerkleTree) other;
+
+        return this.root.equals(that.root)
+            && this.fullRange.equals(that.fullRange)
+            && this.partitioner == that.partitioner
+            && this.hashdepth == that.hashdepth
+            && this.maxsize == that.maxsize
+            && this.size == that.size;
     }
 
     /**
@@ -664,28 +621,27 @@ public class MerkleTree implements Serializable
      */
     public static class TreeRange extends Range<Token>
     {
-        public static final long serialVersionUID = 1L;
         private final MerkleTree tree;
-        public final byte depth;
-        private final Hashable hashable;
+        public final int depth;
+        private final Node node;
 
-        TreeRange(MerkleTree tree, Token left, Token right, byte depth, Hashable hashable)
+        TreeRange(MerkleTree tree, Token left, Token right, int depth, Node node)
         {
             super(left, right);
             this.tree = tree;
             this.depth = depth;
-            this.hashable = hashable;
+            this.node = node;
         }
 
-        public void hash(byte[] hash)
+        TreeRange(Token left, Token right, int depth)
         {
-            assert tree != null : "Not intended for modification!";
-            hashable.hash(hash);
+            this(null, left, right, depth, null);
         }
 
-        public byte[] hash()
+        public void hash(byte[] hash)
         {
-            return hashable.hash();
+            assert tree != null : "Not intended for modification!";
+            node.hash(hash);
         }
 
         /**
@@ -694,18 +650,9 @@ public class MerkleTree implements Serializable
         public void addHash(RowHash entry)
         {
             assert tree != null : "Not intended for modification!";
-            assert hashable instanceof Leaf;
-
-            hashable.addHash(entry.hash, entry.size);
-        }
-
-        public void ensureHashInitialised()
-        {
-            assert tree != null : "Not intended for modification!";
-            assert hashable instanceof Leaf;
 
-            if (hashable.hash == null)
-                hashable.hash = EMPTY_HASH;
+            assert node instanceof OnHeapLeaf;
+            ((OnHeapLeaf) node).addHash(entry.hash, entry.size);
         }
 
         public void addAll(Iterator<RowHash> entries)
@@ -717,9 +664,7 @@ public class MerkleTree implements Serializable
         @Override
         public String toString()
         {
-            StringBuilder buff = new StringBuilder("#<TreeRange ");
-            buff.append(super.toString()).append(" depth=").append(depth);
-            return buff.append(">").toString();
+            return "#<TreeRange " + super.toString() + " depth=" + depth + '>';
         }
     }
 
@@ -740,8 +685,8 @@ public class MerkleTree implements Serializable
 
         TreeRangeIterator(MerkleTree tree)
         {
-            tovisit = new ArrayDeque<TreeRange>();
-            tovisit.add(new TreeRange(tree, tree.fullRange.left, tree.fullRange.right, (byte)0, tree.root));
+            tovisit = new ArrayDeque<>();
+            tovisit.add(new TreeRange(tree, tree.fullRange.left, tree.fullRange.right, 0, tree.root));
             this.tree = tree;
         }
 
@@ -756,7 +701,7 @@ public class MerkleTree implements Serializable
             {
                 TreeRange active = tovisit.pop();
 
-                if (active.hashable instanceof Leaf)
+                if (active.node instanceof Leaf)
                 {
                     // found a leaf invalid range
                     if (active.isWrapAround() && !tovisit.isEmpty())
@@ -765,9 +710,9 @@ public class MerkleTree implements Serializable
                     return active;
                 }
 
-                Inner node = (Inner)active.hashable;
-                TreeRange left = new TreeRange(tree, active.left, node.token, inc(active.depth), node.lchild);
-                TreeRange right = new TreeRange(tree, node.token, active.right, inc(active.depth), node.rchild);
+                Inner node = (Inner)active.node;
+                TreeRange left = new TreeRange(tree, active.left, node.token(), active.depth + 1, node.left());
+                TreeRange right = new TreeRange(tree, node.token(), active.right, active.depth + 1, node.right());
 
                 if (right.isWrapAround())
                 {
@@ -792,123 +737,357 @@ public class MerkleTree implements Serializable
     }
 
     /**
-     * An inner node in the MerkleTree. Inners can contain cached hash values, which
-     * are the binary hash of their two children.
+     * Hash value representing a row, to be used to pass hashes to the MerkleTree.
+     * The byte[] hash value should contain a digest of the key and value of the row
+     * created using a very strong hash function.
      */
-    static class Inner extends Hashable
+    public static class RowHash
     {
-        public static final long serialVersionUID = 1L;
-        static final byte IDENT = 2;
         public final Token token;
-        private Hashable lchild;
-        private Hashable rchild;
-
-        private static final InnerSerializer serializer = new InnerSerializer();
+        public final byte[] hash;
+        public final long size;
 
-        /**
-         * Constructs an Inner with the given token and children, and a null hash.
-         */
-        public Inner(Token token, Hashable lchild, Hashable rchild)
+        public RowHash(Token token, byte[] hash, long size)
         {
-            super(null);
             this.token = token;
-            this.lchild = lchild;
-            this.rchild = rchild;
+            this.hash  = hash;
+            this.size  = size;
         }
 
-        public Hashable lchild()
+        @Override
+        public String toString()
         {
-            return lchild;
+            return "#<RowHash " + token + ' ' + (hash == null ? "null" : Hex.bytesToHex(hash)) + " @ " + size + " bytes>";
         }
+    }
+
+    public void serialize(DataOutputPlus out, int version) throws IOException
+    {
+        out.writeByte(hashdepth);
+        out.writeLong(maxsize);
+        out.writeLong(size);
+        out.writeUTF(partitioner.getClass().getCanonicalName());
+        Token.serializer.serialize(fullRange.left, out, version);
+        Token.serializer.serialize(fullRange.right, out, version);
+        root.serialize(out, version);
+    }
+
+    public long serializedSize(int version)
+    {
+        long size = 1 // mt.hashdepth
+                  + sizeof(maxsize)
+                  + sizeof(this.size)
+                  + sizeof(partitioner.getClass().getCanonicalName());
+        size += Token.serializer.serializedSize(fullRange.left, version);
+        size += Token.serializer.serializedSize(fullRange.right, version);
+        size += root.serializedSize(version);
+        return size;
+    }
+
+    public static MerkleTree deserialize(DataInputPlus in, int version) throws IOException
+    {
+        return deserialize(in, DatabaseDescriptor.getOffheapMerkleTreesEnabled(), version);
+    }
+
+    public static MerkleTree deserialize(DataInputPlus in, boolean offHeapRequested, int version) throws IOException
+    {
+        int hashDepth = in.readByte();
+        long maxSize = in.readLong();
+        int innerNodeCount = Ints.checkedCast(in.readLong());
 
-        public Hashable rchild()
+        IPartitioner partitioner;
+        try
+        {
+            partitioner = FBUtilities.newPartitioner(in.readUTF());
+        }
+        catch (ConfigurationException e)
         {
-            return rchild;
+            throw new IOException(e);
         }
 
-        public void lchild(Hashable child)
+        Token left = Token.serializer.deserialize(in, partitioner, version);
+        Token right = Token.serializer.deserialize(in, partitioner, version);
+        Range<Token> fullRange = new Range<>(left, right);
+        Node root = deserializeTree(in, partitioner, innerNodeCount, offHeapRequested, version);
+        return new MerkleTree(root, partitioner, fullRange, hashDepth, maxSize, innerNodeCount);
+    }
+
+    private static boolean warnedOnce;
+    private static Node deserializeTree(DataInputPlus in, IPartitioner partitioner, int innerNodeCount, boolean offHeapRequested, int version) throws IOException
+    {
+        boolean offHeapSupported = partitioner instanceof Murmur3Partitioner || partitioner instanceof RandomPartitioner;
+
+        if (offHeapRequested && !offHeapSupported && !warnedOnce)
         {
-            lchild = child;
+            logger.warn("Configuration requests offheap memtables, but partitioner does not support it. Ignoring.");
+            warnedOnce = true;
         }
 
-        public void rchild(Hashable child)
+        return offHeapRequested && offHeapSupported
+             ? deserializeOffHeap(in, partitioner, innerNodeCount, version)
+             : OnHeapNode.deserialize(in, partitioner, version);
+    }
+
+    /*
+     * Coordinating multiple trees from multiple replicas can get expensive.
+     * On the deserialization path, we know in advance what the tree looks like,
+     * So we can pre-size an offheap buffer and deserialize into that.
+     */
+
+    MerkleTree tryMoveOffHeap() throws IOException
+    {
+        boolean offHeapEnabled = DatabaseDescriptor.getOffheapMerkleTreesEnabled();
+        boolean offHeapSupported = partitioner instanceof Murmur3Partitioner || partitioner instanceof RandomPartitioner;
+
+        if (offHeapEnabled && !offHeapSupported && !warnedOnce)
         {
-            rchild = child;
+            logger.warn("Configuration requests offheap memtables, but partitioner does not support it. Ignoring.");
+            warnedOnce = true;
         }
 
-        Hashable calc()
+        return root instanceof OnHeapNode && offHeapEnabled && offHeapSupported ? moveOffHeap() : this;
+    }
+
+    private MerkleTree moveOffHeap() throws IOException
+    {
+        assert root instanceof OnHeapNode;
+        int bufferSize = offHeapBufferSize(Ints.checkedCast(size), partitioner);
+        logger.debug("Allocating direct buffer of size {} to move merkle tree off heap", bufferSize);
+        ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
+        int pointer = ((OnHeapNode) root).serializeOffHeap(buffer, partitioner);
+        OffHeapNode newRoot = fromPointer(pointer, buffer, partitioner).attachRef();
+        return new MerkleTree(newRoot, partitioner, fullRange, hashdepth, maxsize, size);
+    }
+
+    private static OffHeapNode deserializeOffHeap(DataInputPlus in, IPartitioner partitioner, int innerNodeCount, int version) throws IOException
+    {
+        int bufferSize = offHeapBufferSize(innerNodeCount, partitioner);
+        logger.debug("Allocating direct buffer of size {} for merkle tree deserialization", bufferSize);
+        ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
+        int pointer = OffHeapNode.deserialize(in, buffer, partitioner, version);
+        return fromPointer(pointer, buffer, partitioner).attachRef();
+    }
+
+    private static OffHeapNode fromPointer(int pointer, ByteBuffer buffer, IPartitioner partitioner)
+    {
+        return pointer >= 0 ? new OffHeapInner(buffer, pointer, partitioner) : new OffHeapLeaf(buffer, ~pointer);
+    }
+
+    private static int offHeapBufferSize(int innerNodeCount, IPartitioner partitioner)
+    {
+        return innerNodeCount * OffHeapInner.maxOffHeapSize(partitioner) + (innerNodeCount + 1) * OffHeapLeaf.maxOffHeapSize();
+    }
+
+    interface Node
+    {
+        byte[] hash();
+
+        boolean hasEmptyHash();
+
+        void hash(byte[] hash);
+
+        boolean hashesDiffer(Node other);
+
+        default Node compute()
         {
-            if (hash == null)
-            {
-                // hash and size haven't been calculated; calc children then compute
-                Hashable lnode = lchild.calc();
-                Hashable rnode = rchild.calc();
-                // cache the computed value
-                hash(lnode.hash, rnode.hash);
-                sizeOfRange = lnode.sizeOfRange + rnode.sizeOfRange;
-                rowsInRange = lnode.rowsInRange + rnode.rowsInRange;
-            }
             return this;
         }
 
-        /**
-         * Recursive toString.
-         */
-        public void toString(StringBuilder buff, int maxdepth)
+        default long sizeOfRange()
         {
-            buff.append("#<").append(getClass().getSimpleName());
-            buff.append(" ").append(token);
-            buff.append(" hash=").append(Hashable.toString(hash()));
-            buff.append(" children=[");
-            if (maxdepth < 1)
-            {
-                buff.append("#");
-            }
-            else
-            {
-                if (lchild == null)
-                    buff.append("null");
-                else
-                    lchild.toString(buff, maxdepth-1);
-                buff.append(" ");
-                if (rchild == null)
-                    buff.append("null");
-                else
-                    rchild.toString(buff, maxdepth-1);
-            }
-            buff.append("]>");
+            return 0;
+        }
+
+        default long partitionsInRange()
+        {
+            return 0;
+        }
+
+        void serialize(DataOutputPlus out, int version) throws IOException;
+        int serializedSize(int version);
+
+        void toString(StringBuilder buff, int maxdepth);
+
+        static String toString(byte[] hash)
+        {
+            return hash == null
+                 ? "null"
+                 : '[' + Hex.bytesToHex(hash) + ']';
+        }
+
+        boolean equals(Node node);
+    }
+
+    static abstract class OnHeapNode implements Node
+    {
+        long sizeOfRange;
+        long partitionsInRange;
+
+        protected byte[] hash;
+
+        OnHeapNode(byte[] hash)
+        {
+            if (hash == null)
+                throw new IllegalArgumentException();
+
+            this.hash = hash;
+        }
+
+        public byte[] hash()
+        {
+            return hash;
+        }
+
+        public boolean hasEmptyHash()
+        {
+            //noinspection ArrayEquality
+            return hash == EMPTY_HASH;
+        }
+
+        public void hash(byte[] hash)
+        {
+            if (hash == null)
+                throw new IllegalArgumentException();
+
+            this.hash = hash;
+        }
+
+        public boolean hashesDiffer(Node other)
+        {
+            return other instanceof OnHeapNode
+                 ? hashesDiffer( (OnHeapNode) other)
+                 : hashesDiffer((OffHeapNode) other);
+        }
+
+        private boolean hashesDiffer(OnHeapNode other)
+        {
+            return !Arrays.equals(hash(), other.hash());
+        }
+
+        private boolean hashesDiffer(OffHeapNode other)
+        {
+            return compare(hash(), other.buffer(), other.hashBytesOffset(), HASH_SIZE) != 0;
         }
 
         @Override
-        public String toString()
+        public long sizeOfRange()
         {
-            StringBuilder buff = new StringBuilder();
-            toString(buff, 1);
-            return buff.toString();
+            return sizeOfRange;
         }
 
-        private static class InnerSerializer implements IPartitionerDependentSerializer<Inner>
+        @Override
+        public long partitionsInRange()
         {
-            public void serialize(Inner inner, DataOutputPlus out, int version) throws IOException
-            {
-                Token.serializer.serialize(inner.token, out, version);
-                Hashable.serializer.serialize(inner.lchild, out, version);
-                Hashable.serializer.serialize(inner.rchild, out, version);
-            }
+            return partitionsInRange;
+        }
 
-            public Inner deserialize(DataInput in, IPartitioner p, int version) throws IOException
+        static OnHeapNode deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException
+        {
+            byte ident = in.readByte();
+
+            switch (ident)
             {
-                Token token = Token.serializer.deserialize(in, p, version);
-                Hashable lchild = Hashable.serializer.deserialize(in, p, version);
-                Hashable rchild = Hashable.serializer.deserialize(in, p, version);
-                return new Inner(token, lchild, rchild);
+                case Inner.IDENT:
+                    return OnHeapInner.deserializeWithoutIdent(in, p, version);
+                case Leaf.IDENT:
+                    return OnHeapLeaf.deserializeWithoutIdent(in);
+                default:
+                    throw new IOException("Unexpected node type: " + ident);
             }
+        }
+
+        abstract int serializeOffHeap(ByteBuffer buffer, IPartitioner p) throws IOException;
+    }
+
+    static abstract class OffHeapNode implements Node
+    {
+        protected final ByteBuffer buffer;
+        protected final int offset;
+
+        OffHeapNode(ByteBuffer buffer, int offset)
+        {
+            this.buffer = buffer;
+            this.offset = offset;
+        }
+
+        ByteBuffer buffer()
+        {
+            return buffer;
+        }
+
+        public byte[] hash()
+        {
+            final int position = buffer.position();
+            buffer.position(hashBytesOffset());
+            byte[] array = new byte[HASH_SIZE];
+            buffer.get(array);
+            buffer.position(position);
+            return array;
+        }
+
+        public boolean hasEmptyHash()
+        {
+            return compare(buffer(), hashBytesOffset(), HASH_SIZE, EMPTY_HASH) == 0;
+        }
+
+        public void hash(byte[] hash)
+        {
+            throw new UnsupportedOperationException();
+        }
 
-            public long serializedSize(Inner inner, int version)
+        public boolean hashesDiffer(Node other)
+        {
+            return other instanceof OnHeapNode
+                 ? hashesDiffer((OnHeapNode) other)
+                 : hashesDiffer((OffHeapNode) other);
+        }
+
+        private boolean hashesDiffer(OnHeapNode other)
+        {
+            return compare(buffer(), hashBytesOffset(), HASH_SIZE, other.hash()) != 0;
+        }
+
+        private boolean hashesDiffer(OffHeapNode other)
+        {
+            int thisOffset = hashBytesOffset();
+            int otherOffset = other.hashBytesOffset();
+
+            for (int i = 0; i < HASH_SIZE; i += 8)
+                if (buffer().getLong(thisOffset + i) != other.buffer().getLong(otherOffset + i))
+                    return true;
+
+            return false;
+        }
+
+        OffHeapNode attachRef()
+        {
+            if (Ref.DEBUG_ENABLED)
+                MemoryUtil.setAttachment(buffer, new Ref<>(this, null));
+            return this;
+        }
+
+        void release()
+        {
+            Object attachment = MemoryUtil.getAttachment(buffer);
+            if (attachment instanceof Ref)
+                ((Ref) attachment).release();
+            FileUtils.clean(buffer);
+        }
+
+        abstract int hashBytesOffset();
+
+        static int deserialize(DataInputPlus in, ByteBuffer buffer, IPartitioner p, int version) throws IOException
+        {
+            byte ident = in.readByte();
+
+            switch (ident)
             {
-                return Token.serializer.serializedSize(inner.token, version)
-                     + Hashable.serializer.serializedSize(inner.lchild, version)
-                     + Hashable.serializer.serializedSize(inner.rchild, version);
+                case Inner.IDENT:
+                    return OffHeapInner.deserializeWithoutIdent(in, buffer, p, version);
+                case Leaf.IDENT:
+                    return  OffHeapLeaf.deserializeWithoutIdent(in, buffer);
+                default:
+                    throw new IOException("Unexpected node type: " + ident);
             }
         }
     }
@@ -922,237 +1101,440 @@ public class MerkleTree implements Serializable
      * tree extending below the Leaf is generated in memory, but only the root
      * is stored in the Leaf.
      */
-    static class Leaf extends Hashable
+    interface Leaf extends Node
     {
-        public static final long serialVersionUID = 1L;
         static final byte IDENT = 1;
-        private static final LeafSerializer serializer = new LeafSerializer();
+
+        default void serialize(DataOutputPlus out, int version) throws IOException
+        {
+            byte[] hash = hash();
+            assert hash.length == HASH_SIZE;
+
+            out.writeByte(Leaf.IDENT);
+
+            if (!hasEmptyHash())
+            {
+                out.writeByte(HASH_SIZE);
+                out.write(hash);
+            }
+            else
+            {
+                out.writeByte(0);
+            }
+        }
+
+        default int serializedSize(int version)
+        {
+            return 2 + (hasEmptyHash() ? 0 : HASH_SIZE);
+        }
+
+        default void toString(StringBuilder buff, int maxdepth)
+        {
+            buff.append(toString());
+        }
+
+        default boolean equals(Node other)
+        {
+            return other instanceof Leaf && !hashesDiffer(other);
+        }
+    }
+
+    static class OnHeapLeaf extends OnHeapNode implements Leaf
+    {
+        OnHeapLeaf()
+        {
+            super(EMPTY_HASH);
+        }
+
+        OnHeapLeaf(byte[] hash)
+        {
+            super(hash);
+        }
 
         /**
-         * Constructs a null hash.
+         * Mixes the given value into our hash. If our hash is null,
+         * our hash will become the given value.
          */
-        public Leaf()
+        void addHash(byte[] partitionHash, long partitionSize)
         {
-            super(null);
+            if (hasEmptyHash())
+                hash(partitionHash);
+            else
+                FBUtilities.xorOntoLeft(hash, partitionHash);
+
+            sizeOfRange += partitionSize;
+            partitionsInRange += 1;
         }
 
-        public Leaf(byte[] hash)
+        static OnHeapLeaf deserializeWithoutIdent(DataInputPlus in) throws IOException
         {
-            super(hash);
+            if (in.readByte() > 0)
+            {
+                byte[] hash = new byte[HASH_SIZE];
+                in.readFully(hash);
+                return new OnHeapLeaf(hash);
+            }
+            else
+            {
+                return new OnHeapLeaf();
+            }
         }
 
-        public void toString(StringBuilder buff, int maxdepth)
+        int serializeOffHeap(ByteBuffer buffer, IPartitioner p)
         {
-            buff.append(toString());
+            if (buffer.remaining() < OffHeapLeaf.maxOffHeapSize())
+                throw new IllegalStateException("Insufficient remaining bytes to deserialize a Leaf node off-heap");
+
+            if (hash.length != HASH_SIZE)
+                throw new IllegalArgumentException("Hash of unexpected size when serializing a Leaf off-heap: " + hash.length);
+
+            final int position = buffer.position();
+            buffer.put(hash);
+            return ~position;
         }
 
         @Override
         public String toString()
         {
-            return "#<Leaf " + Hashable.toString(hash()) + ">";
+            return "#<OnHeapLeaf " + Node.toString(hash()) + '>';
         }
+    }
+
+    static class OffHeapLeaf extends OffHeapNode implements Leaf
+    {
+        static final int HASH_BYTES_OFFSET = 0;
 
-        private static class LeafSerializer implements IPartitionerDependentSerializer<Leaf>
+        OffHeapLeaf(ByteBuffer buffer, int offset)
         {
-            public void serialize(Leaf leaf, DataOutputPlus out, int version) throws IOException
-            {
-                if (leaf.hash == null)
-                {
-                    out.writeByte(-1);
-                }
-                else
-                {
-                    out.writeByte(leaf.hash.length);
-                    out.write(leaf.hash);
-                }
-            }
+            super(buffer, offset);
+        }
+
+        public int hashBytesOffset()
+        {
+            return offset + HASH_BYTES_OFFSET;
+        }
+
+        static int deserializeWithoutIdent(DataInput in, ByteBuffer buffer) throws IOException
+        {
+            if (buffer.remaining() < maxOffHeapSize())
+                throw new IllegalStateException("Insufficient remaining bytes to deserialize a Leaf node off-heap");
 
-            public Leaf deserialize(DataInput in, IPartitioner p, int version) throws IOException
+            final int position = buffer.position();
+
+            int hashLength = in.readByte();
+            if (hashLength > 0)
             {
-                int hashLen = in.readByte();
-                byte[] hash = hashLen < 0 ? null : new byte[hashLen];
-                if (hash != null)
-                    in.readFully(hash);
-                return new Leaf(hash);
-            }
+                if (hashLength != HASH_SIZE)
+                    throw new IllegalStateException("Hash of unexpected size when deserializing an off-heap Leaf node: " + hashLength);
 
-            public long serializedSize(Leaf leaf, int version)
+                byte[] hashBytes = getTempArray(HASH_SIZE);
+                in.readFully(hashBytes, 0, HASH_SIZE);
+                buffer.put(hashBytes, 0, HASH_SIZE);
+            }
+            else
             {
-                long size = 1;
-                if (leaf.hash != null)
-                    size += leaf.hash().length;
-                return size;
+                buffer.put(EMPTY_HASH, 0, HASH_SIZE);
             }
+
+            return ~position;
         }
-    }
 
-    /**
-     * Hash value representing a row, to be used to pass hashes to the MerkleTree.
-     * The byte[] hash value should contain a digest of the key and value of the row
-     * created using a very strong hash function.
-     */
-    public static class RowHash
-    {
-        public final Token token;
-        public final byte[] hash;
-        public final long size;
-        public RowHash(Token token, byte[] hash, long size)
+        static int maxOffHeapSize()
         {
-            this.token = token;
-            this.hash  = hash;
-            this.size = size;
+            return HASH_SIZE;
         }
 
         @Override
         public String toString()
         {
-            return "#<RowHash " + token + " " + Hashable.toString(hash) + " @ " + size + " bytes>";
+            return "#<OffHeapLeaf " + Node.toString(hash()) + '>';
         }
     }
 
     /**
-     * Abstract class containing hashing logic, and containing a single hash field.
+     * An inner node in the MerkleTree. Inners can contain cached hash values, which
+     * are the binary hash of their two children.
      */
-    static abstract class Hashable implements Serializable
+    interface Inner extends Node
     {
-        private static final long serialVersionUID = 1L;
-        private static final IPartitionerDependentSerializer<Hashable> serializer = new HashableSerializer();
+        static final byte IDENT = 2;
 
-        protected byte[] hash;
-        protected long sizeOfRange;
-        protected long rowsInRange;
+        public Token token();
 
-        protected Hashable(byte[] hash)
+        public Node left();
+        public Node right();
+
+        default void serialize(DataOutputPlus out, int version) throws IOException
         {
-            this.hash = hash;
+            out.writeByte(Inner.IDENT);
+            Token.serializer.serialize(token(), out, version);
+            left().serialize(out, version);
+            right().serialize(out, version);
         }
 
-        public byte[] hash()
+        default int serializedSize(int version)
         {
-            return hash;
+            return 1
+                 + (int) Token.serializer.serializedSize(token(), version)
+                 + left().serializedSize(version)
+                 + right().serializedSize(version);
         }
 
-        public long sizeOfRange()
+        default void toString(StringBuilder buff, int maxdepth)
         {
-            return sizeOfRange;
+            buff.append("#<").append(getClass().getSimpleName())
+                .append(' ').append(token())
+                .append(" hash=").append(Node.toString(hash()))
+                .append(" children=[");
+
+            if (maxdepth < 1)
+            {
+                buff.append('#');
+            }
+            else
+            {
+                Node left = left();
+                if (left == null)
+                    buff.append("null");
+                else
+                    left.toString(buff, maxdepth - 1);
+
+                buff.append(' ');
+
+                Node right = right();
+                if (right == null)
+                    buff.append("null");
+                else
+                    right.toString(buff, maxdepth - 1);
+            }
+
+            buff.append("]>");
         }
 
-        public long rowsInRange()
+        default boolean equals(Node other)
         {
-            return rowsInRange;
+            if (!(other instanceof Inner))
+                return false;
+            Inner that = (Inner) other;
+            return !hashesDiffer(other) && this.left().equals(that.left()) && this.right().equals(that.right());
         }
+    }
+
+    static class OnHeapInner extends OnHeapNode implements Inner
+    {
+        private final Token token;
 
-        void hash(byte[] hash)
+        private OnHeapNode left;
+        private OnHeapNode right;
+
+        private boolean computed;
+
+        OnHeapInner(Token token, OnHeapNode left, OnHeapNode right)
         {
-            this.hash = hash;
+            super(EMPTY_HASH);
+
+            this.token = token;
+            this.left = left;
+            this.right = right;
         }
 
-        Hashable calc()
+        public Token token()
         {
-            return this;
+            return token;
         }
 
-        /**
-         * Sets the value of this hash to binaryHash of its children.
-         * @param lefthash Hash of left child.
-         * @param righthash Hash of right child.
-         */
-        void hash(byte[] lefthash, byte[] righthash)
+        public OnHeapNode left()
         {
-            hash = binaryHash(lefthash, righthash);
+            return left;
         }
 
-        /**
-         * Mixes the given value into our hash. If our hash is null,
-         * our hash will become the given value.
-         */
-        void addHash(byte[] righthash, long sizeOfRow)
+        public OnHeapNode right()
         {
-            if (hash == null)
-                hash = righthash;
-            else
-                hash = binaryHash(hash, righthash);
-            this.sizeOfRange += sizeOfRow;
-            this.rowsInRange += 1;
+            return right;
         }
 
-        /**
-         * The primitive with which all hashing should be accomplished: hashes
-         * a left and right value together.
-         */
-        static byte[] binaryHash(final byte[] left, final byte[] right)
+        void left(OnHeapNode child)
         {
-            return FBUtilities.xor(left, right);
+            left = child;
         }
 
-        public abstract void toString(StringBuilder buff, int maxdepth);
-
-        public static String toString(byte[] hash)
+        void right(OnHeapNode child)
         {
-            if (hash == null)
-                return "null";
-            return "[" + Hex.bytesToHex(hash) + "]";
+            right = child;
         }
 
-        private static class HashableSerializer implements IPartitionerDependentSerializer<Hashable>
+        @Override
+        public Node compute()
         {
-            public void serialize(Hashable h, DataOutputPlus out, int version) throws IOException
+            if (!computed) // hash and size haven't been calculated; compute children then compute this
             {
-                if (h instanceof Inner)
-                {
-                    out.writeByte(Inner.IDENT);
-                    Inner.serializer.serialize((Inner)h, out, version);
-                }
-                else if (h instanceof Leaf)
-                {
-                    out.writeByte(Leaf.IDENT);
-                    Leaf.serializer.serialize((Leaf) h, out, version);
-                }
-                else
-                    throw new IOException("Unexpected Hashable: " + h.getClass().getCanonicalName());
-            }
+                left.compute();
+                right.compute();
 
-            public Hashable deserialize(DataInput in, IPartitioner p, int version) throws IOException
-            {
-                byte ident = in.readByte();
-                if (Inner.IDENT == ident)
-                    return Inner.serializer.deserialize(in, p, version);
-                else if (Leaf.IDENT == ident)
-                    return Leaf.serializer.deserialize(in, p, version);
-                else
-                    throw new IOException("Unexpected Hashable: " + ident);
+                if (!left.hasEmptyHash() && !right.hasEmptyHash())
+                    hash = FBUtilities.xor(left.hash(), right.hash());
+                else if (left.hasEmptyHash())
+                    hash = right.hash();
+                else if (right.hasEmptyHash())
+                    hash = left.hash();
+
+                sizeOfRange       = left.sizeOfRange()       + right.sizeOfRange();
+                partitionsInRange = left.partitionsInRange() + right.partitionsInRange();
+
+                computed = true;
             }
 
-            public long serializedSize(Hashable h, int version)
+            return this;
+        }
+
+        static OnHeapInner deserializeWithoutIdent(DataInputPlus in, IPartitioner p, int version) throws IOException
+        {
+            Token token = Token.serializer.deserialize(in, p, version);
+            OnHeapNode  left = OnHeapNode.deserialize(in, p, version);
+            OnHeapNode right = OnHeapNode.deserialize(in, p, version);
+            return new OnHeapInner(token, left, right);
+        }
+
+        int serializeOffHeap(ByteBuffer buffer, IPartitioner partitioner) throws IOException
+        {
+            if (buffer.remaining() < OffHeapInner.maxOffHeapSize(partitioner))
+                throw new IllegalStateException("Insufficient remaining bytes to deserialize Inner node off-heap");
+
+            final int offset = buffer.position();
+
+            int tokenSize = partitioner.getTokenFactory().byteSize(token);
+            buffer.putShort(offset + OffHeapInner.TOKEN_LENGTH_OFFSET, Shorts.checkedCast(tokenSize));
+            buffer.position(offset + OffHeapInner.TOKEN_BYTES_OFFSET);
+            partitioner.getTokenFactory().serialize(token, buffer);
+
+            int  leftPointer =  left.serializeOffHeap(buffer, partitioner);
+            int rightPointer = right.serializeOffHeap(buffer, partitioner);
+
+            buffer.putInt(offset + OffHeapInner.LEFT_CHILD_POINTER_OFFSET,  leftPointer);
+            buffer.putInt(offset + OffHeapInner.RIGHT_CHILD_POINTER_OFFSET, rightPointer);
+
+            int  leftHashOffset = OffHeapInner.hashBytesOffset(leftPointer);
+            int rightHashOffset = OffHeapInner.hashBytesOffset(rightPointer);
+
+            for (int i = 0; i < HASH_SIZE; i += 8)
             {
-                if (h instanceof Inner)
-                    return 1 + Inner.serializer.serializedSize((Inner) h, version);
-                else if (h instanceof Leaf)
-                    return 1 + Leaf.serializer.serializedSize((Leaf) h, version);
-                throw new AssertionError(h.getClass());
+                buffer.putLong(offset + OffHeapInner.HASH_BYTES_OFFSET + i,
+                               buffer.getLong(leftHashOffset  + i) ^ buffer.getLong(rightHashOffset + i));
             }
+
+            return offset;
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder buff = new StringBuilder();
+            toString(buff, 1);
+            return buff.toString();
         }
     }
 
-    /**
-     * Exceptions that stop recursion early when we are sure that no answer
-     * can be found.
-     */
-    static abstract class StopRecursion extends Exception
+    static class OffHeapInner extends OffHeapNode implements Inner
     {
-        static class BadRange extends StopRecursion
+        /**
+         * All we want to keep here is just a pointer to the start of the Inner leaf in the
+         * direct buffer. From there, we'll be able to deserialize the following, in this order:
+         *
+         * 1. pointer to left child (int)
+         * 2. pointer to right child (int)
+         * 3. hash bytes (space allocated as HASH_MAX_SIZE)
+         * 4. token length (short)
+         * 5. token bytes (variable length)
+         */
+        static final int LEFT_CHILD_POINTER_OFFSET  = 0;
+        static final int RIGHT_CHILD_POINTER_OFFSET = 4;
+        static final int HASH_BYTES_OFFSET          = 8;
+        static final int TOKEN_LENGTH_OFFSET        = 8 + HASH_SIZE;
+        static final int TOKEN_BYTES_OFFSET         = TOKEN_LENGTH_OFFSET + 2;
+
+        private final IPartitioner partitioner;
+
+        OffHeapInner(ByteBuffer buffer, int offset, IPartitioner partitioner)
         {
-            public BadRange(){ super(); }
+            super(buffer, offset);
+            this.partitioner = partitioner;
         }
 
-        static class InvalidHash extends StopRecursion
+        public Token token()
         {
-            public InvalidHash(){ super(); }
+            int length = buffer.getShort(offset + TOKEN_LENGTH_OFFSET);
+            return partitioner.getTokenFactory().fromByteBuffer(buffer, offset + TOKEN_BYTES_OFFSET, length);
         }
 
-        static class TooDeep extends StopRecursion
+        public Node left()
         {
-            public TooDeep(){ super(); }
+            int pointer = buffer.getInt(offset + LEFT_CHILD_POINTER_OFFSET);
+            return pointer >= 0 ? new OffHeapInner(buffer, pointer, partitioner) : new OffHeapLeaf(buffer, ~pointer);
+        }
+
+        public Node right()
+        {
+            int pointer = buffer.getInt(offset + RIGHT_CHILD_POINTER_OFFSET);
+            return pointer >= 0 ? new OffHeapInner(buffer, pointer, partitioner) : new OffHeapLeaf(buffer, ~pointer);
+        }
+
+        public int hashBytesOffset()
+        {
+            return offset + HASH_BYTES_OFFSET;
+        }
+
+        static int deserializeWithoutIdent(DataInputPlus in, ByteBuffer buffer, IPartitioner partitioner, int version) throws IOException
+        {
+            if (buffer.remaining() < maxOffHeapSize(partitioner))
+                throw new IllegalStateException("Insufficient remaining bytes to deserialize Inner node off-heap");
+
+            final int offset = buffer.position();
+
+            int tokenSize = Token.serializer.deserializeSize(in);
+            byte[] tokenBytes = getTempArray(tokenSize);
+            in.readFully(tokenBytes, 0, tokenSize);
+
+            buffer.putShort(offset + OffHeapInner.TOKEN_LENGTH_OFFSET, Shorts.checkedCast(tokenSize));
+            buffer.position(offset + OffHeapInner.TOKEN_BYTES_OFFSET);
+            buffer.put(tokenBytes, 0, tokenSize);
+
+            int leftPointer  = OffHeapNode.deserialize(in, buffer, partitioner, version);
+            int rightPointer = OffHeapNode.deserialize(in, buffer, partitioner, version);
+
+            buffer.putInt(offset + OffHeapInner.LEFT_CHILD_POINTER_OFFSET,  leftPointer);
+            buffer.putInt(offset + OffHeapInner.RIGHT_CHILD_POINTER_OFFSET, rightPointer);
+
+            int leftHashOffset  = hashBytesOffset(leftPointer);
+            int rightHashOffset = hashBytesOffset(rightPointer);
+
+            for (int i = 0; i < HASH_SIZE; i += 8)
+            {
+                buffer.putLong(offset + OffHeapInner.HASH_BYTES_OFFSET + i,
+                               buffer.getLong(leftHashOffset  + i) ^ buffer.getLong(rightHashOffset + i));
+            }
+
+            return offset;
+        }
+
+        static int maxOffHeapSize(IPartitioner partitioner)
+        {
+            return 4 // left pointer
+                 + 4 // right pointer
+                 + HASH_SIZE
+                 + 2 + partitioner.getMaxTokenSize();
+        }
+
+        static int hashBytesOffset(int pointer)
+        {
+            return pointer >= 0 ? pointer + OffHeapInner.HASH_BYTES_OFFSET : ~pointer + OffHeapLeaf.HASH_BYTES_OFFSET;
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder buff = new StringBuilder();
+            toString(buff, 1);
+            return buff.toString();
         }
     }
 
@@ -1183,10 +1565,10 @@ public class MerkleTree implements Serializable
     {
         byte[] hashLeft = new byte[bytesPerHash];
         byte[] hashRigth = new byte[bytesPerHash];
-        Leaf left = new Leaf(hashLeft);
-        Leaf right = new Leaf(hashRigth);
-        Inner inner = new Inner(partitioner.getMinimumToken(), left, right);
-        inner.calc();
+        OnHeapLeaf left = new OnHeapLeaf(hashLeft);
+        OnHeapLeaf right = new OnHeapLeaf(hashRigth);
+        Inner inner = new OnHeapInner(partitioner.getMinimumToken(), left, right);
+        inner.compute();
 
         // Some partioners have variable token sizes, try to estimate as close as we can by using the same
         // heap estimate as the memtables use.
diff --git a/src/java/org/apache/cassandra/utils/MerkleTrees.java b/src/java/org/apache/cassandra/utils/MerkleTrees.java
index d2a8058..9cf04c5 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@ -44,9 +44,9 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
 {
     public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
 
-    private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
+    private final Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
 
-    private IPartitioner partitioner;
+    private final IPartitioner partitioner;
 
     /**
      * Creates empty MerkleTrees object.
@@ -143,6 +143,14 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
     }
 
     /**
+     * Dereference all merkle trees and release direct memory for all off-heap trees.
+     */
+    public void release()
+    {
+        merkleTrees.values().forEach(MerkleTree::release); merkleTrees.clear();
+    }
+
+    /**
      * Init a selected MerkleTree with an even tree distribution.
      *
      * @param range
@@ -247,11 +255,11 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
     }
 
     /**
-     * Get an iterator for all the invalids generated by the MerkleTrees.
+     * Get an iterator for all the iterator generated by the MerkleTrees.
      *
      * @return
      */
-    public TreeRangeIterator invalids()
+    public TreeRangeIterator rangeIterator()
     {
         return new TreeRangeIterator();
     }
@@ -285,30 +293,20 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
     @VisibleForTesting
     public byte[] hash(Range<Token> range)
     {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        boolean hashed = false;
-
-        try
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream())
         {
-            for (Range<Token> rt : merkleTrees.keySet())
-            {
-                if (rt.intersects(range))
-                {
-                    byte[] bytes = merkleTrees.get(rt).hash(range);
-                    if (bytes != null)
-                    {
-                        baos.write(bytes);
-                        hashed = true;
-                    }
-                }
-            }
+            boolean hashed = false;
+
+            for (Map.Entry<Range<Token>, MerkleTree> entry : merkleTrees.entrySet())
+                if (entry.getKey().intersects(range))
+                    hashed |= entry.getValue().ifHashesRange(range, n -> baos.write(n.hash()));
+
+            return hashed ? baos.toByteArray() : null;
         }
         catch (IOException e)
         {
             throw new RuntimeException("Unable to append merkle tree hash to result");
         }
-
-        return hashed ? baos.toByteArray() : null;
     }
 
     /**
@@ -354,7 +352,7 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
         {
             if (it.hasNext())
             {
-                current = it.next().invalids();
+                current = it.next().rangeIterator();
 
                 return current.next();
             }
@@ -369,6 +367,17 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
     }
 
     /**
+     * @return a new {@link MerkleTrees} instance with all trees moved off heap.
+     */
+    public MerkleTrees tryMoveOffHeap() throws IOException
+    {
+        Map<Range<Token>, MerkleTree> movedTrees = new TreeMap<>(new TokenRangeComparator());
+        for (Map.Entry<Range<Token>, MerkleTree> entry : merkleTrees.entrySet())
+            movedTrees.put(entry.getKey(), entry.getValue().tryMoveOffHeap());
+        return new MerkleTrees(partitioner, movedTrees.values());
+    }
+
+    /**
      * Get the differences between the two sets of MerkleTrees.
      *
      * @param ltree
@@ -379,9 +388,7 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
     {
         List<Range<Token>> differences = new ArrayList<>();
         for (MerkleTree tree : ltree.merkleTrees.values())
-        {
             differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
-        }
         return differences;
     }
 
@@ -392,7 +399,7 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
             out.writeInt(trees.merkleTrees.size());
             for (MerkleTree tree : trees.merkleTrees.values())
             {
-                MerkleTree.serializer.serialize(tree, out, version);
+                tree.serialize(out, version);
             }
         }
 
@@ -405,7 +412,7 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
             {
                 for (int i = 0; i < nTrees; i++)
                 {
-                    MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
+                    MerkleTree tree = MerkleTree.deserialize(in, version);
                     trees.add(tree);
 
                     if (partitioner == null)
@@ -425,7 +432,7 @@ public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>
             long size = TypeSizes.sizeof(trees.merkleTrees.size());
             for (MerkleTree tree : trees.merkleTrees.values())
             {
-                size += MerkleTree.serializer.serializedSize(tree, version);
+                size += tree.serializedSize(version);
             }
             return size;
         }
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index e787595..443d59e 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -238,10 +238,6 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         MerkleTrees tree = new MerkleTrees(partitioner);
         tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
         tree.init();
-        for (MerkleTree.TreeRange r : tree.invalids())
-        {
-            r.ensureHashInitialised();
-        }
         return tree;
     }
 
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 78fa588..b84adaa 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -774,10 +774,6 @@ public class RepairJobTest
         MerkleTrees tree = new MerkleTrees(MURMUR3_PARTITIONER);
         tree.addMerkleTrees((int) Math.pow(2, 15), fullRange);
         tree.init();
-        for (MerkleTree.TreeRange r : tree.invalids())
-        {
-            r.ensureHashInitialised();
-        }
 
         if (invalidate)
         {
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index aec2612..9e848a9 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -203,12 +203,14 @@ public class ValidatorTest
                                                cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
                                                                                                                 sstable.last.getToken())));
 
-        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddressAndPort(),
+        InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.2");
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, host,
                                                                  Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                  false, PreviewKind.NONE);
 
         final CompletableFuture<Message> outgoingMessageSink = registerOutgoingMessageSink();
-        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
+        Validator validator = new Validator(desc, host, 0, true, false, PreviewKind.NONE);
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
@@ -260,12 +262,14 @@ public class ValidatorTest
                                                      cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
                                                                                                                sstable.last.getToken())));
 
-        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddressAndPort(),
+        InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.2");
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, host,
                                                                  Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                  false, PreviewKind.NONE);
 
         final CompletableFuture<Message> outgoingMessageSink = registerOutgoingMessageSink();
-        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
+        Validator validator = new Validator(desc, host, 0, true, false, PreviewKind.NONE);
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
@@ -320,12 +324,14 @@ public class ValidatorTest
         final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
                                                      cfs.getTableName(), ranges);
 
-        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddressAndPort(),
+        InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.2");
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, host,
                                                                  Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                  false, PreviewKind.NONE);
 
         final CompletableFuture<Message> outgoingMessageSink = registerOutgoingMessageSink();
-        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
+        Validator validator = new Validator(desc, host, 0, true, false, PreviewKind.NONE);
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
index 9693010..de69cd7 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
@@ -74,9 +74,9 @@ public class DifferenceHolderTest
         mt1.init();
         mt2.init();
         // add dummy hashes to both trees
-        for (MerkleTree.TreeRange range : mt1.invalids())
+        for (MerkleTree.TreeRange range : mt1.rangeIterator())
             range.addAll(new MerkleTreesTest.HIterator(range.right));
-        for (MerkleTree.TreeRange range : mt2.invalids())
+        for (MerkleTree.TreeRange range : mt2.rangeIterator())
             range.addAll(new MerkleTreesTest.HIterator(range.right));
 
         MerkleTree.TreeRange leftmost = null;
@@ -85,7 +85,7 @@ public class DifferenceHolderTest
         mt1.maxsize(fullRange, maxsize + 2); // give some room for splitting
 
         // split the leftmost
-        Iterator<MerkleTree.TreeRange> ranges = mt1.invalids();
+        Iterator<MerkleTree.TreeRange> ranges = mt1.rangeIterator();
         leftmost = ranges.next();
         mt1.split(leftmost.right);
 
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index c213271..d8491d0 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -1,21 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyten ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.cassandra.utils;
 
 import java.math.BigInteger;
@@ -36,10 +35,8 @@ import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.MerkleTree.Hashable;
 import org.apache.cassandra.utils.MerkleTree.RowHash;
 import org.apache.cassandra.utils.MerkleTree.TreeRange;
 import org.apache.cassandra.utils.MerkleTree.TreeRangeIterator;
@@ -49,7 +46,7 @@ import static org.junit.Assert.*;
 
 public class MerkleTreeTest
 {
-    public static byte[] DUMMY = "blah".getBytes();
+    private static final byte[] DUMMY = HashingUtils.newMessageDigest("SHA-256").digest("dummy".getBytes());
 
     /**
      * If a test assumes that the tree is 8 units wide, then it should set this value
@@ -68,6 +65,9 @@ public class MerkleTreeTest
     @Before
     public void setup()
     {
+        DatabaseDescriptor.clientInitialization();
+        DatabaseDescriptor.setOffheapMerkleTreesEnabled(false);
+
         TOKEN_SCALE = new BigInteger("8");
         partitioner = RandomPartitioner.instance;
         // TODO need to trickle TokenSerializer
@@ -171,7 +171,7 @@ public class MerkleTreeTest
         Iterator<TreeRange> ranges;
 
         // (zero, zero]
-        ranges = mt.invalids();
+        ranges = mt.rangeIterator();
         assertEquals(new Range<>(tok(-1), tok(-1)), ranges.next());
         assertFalse(ranges.hasNext());
 
@@ -181,7 +181,7 @@ public class MerkleTreeTest
         mt.split(tok(6));
         mt.split(tok(3));
         mt.split(tok(5));
-        ranges = mt.invalids();
+        ranges = mt.rangeIterator();
         assertEquals(new Range<>(tok(6), tok(-1)), ranges.next());
         assertEquals(new Range<>(tok(-1), tok(2)), ranges.next());
         assertEquals(new Range<>(tok(2), tok(3)), ranges.next());
@@ -200,7 +200,7 @@ public class MerkleTreeTest
         Range<Token> range = new Range<>(tok(-1), tok(-1));
 
         // (zero, zero]
-        assertNull(mt.hash(range));
+        assertFalse(mt.hashesRange(range));
 
         // validate the range
         mt.get(tok(-1)).hash(val);
@@ -223,11 +223,12 @@ public class MerkleTreeTest
         // (zero,two] (two,four] (four, zero]
         mt.split(tok(4));
         mt.split(tok(2));
-        assertNull(mt.hash(left));
-        assertNull(mt.hash(partial));
-        assertNull(mt.hash(right));
-        assertNull(mt.hash(linvalid));
-        assertNull(mt.hash(rinvalid));
+
+        assertFalse(mt.hashesRange(left));
+        assertFalse(mt.hashesRange(partial));
+        assertFalse(mt.hashesRange(right));
+        assertFalse(mt.hashesRange(linvalid));
+        assertFalse(mt.hashesRange(rinvalid));
 
         // validate the range
         mt.get(tok(2)).hash(val);
@@ -237,8 +238,8 @@ public class MerkleTreeTest
         assertHashEquals(leftval, mt.hash(left));
         assertHashEquals(partialval, mt.hash(partial));
         assertHashEquals(val, mt.hash(right));
-        assertNull(mt.hash(linvalid));
-        assertNull(mt.hash(rinvalid));
+        assertFalse(mt.hashesRange(linvalid));
+        assertFalse(mt.hashesRange(rinvalid));
     }
 
     @Test
@@ -258,10 +259,6 @@ public class MerkleTreeTest
         mt.split(tok(2));
         mt.split(tok(6));
         mt.split(tok(1));
-        assertNull(mt.hash(full));
-        assertNull(mt.hash(lchild));
-        assertNull(mt.hash(rchild));
-        assertNull(mt.hash(invalid));
 
         // validate the range
         mt.get(tok(1)).hash(val);
@@ -270,10 +267,14 @@ public class MerkleTreeTest
         mt.get(tok(6)).hash(val);
         mt.get(tok(-1)).hash(val);
 
+        assertTrue(mt.hashesRange(full));
+        assertTrue(mt.hashesRange(lchild));
+        assertTrue(mt.hashesRange(rchild));
+        assertFalse(mt.hashesRange(invalid));
+
         assertHashEquals(fullval, mt.hash(full));
         assertHashEquals(lchildval, mt.hash(lchild));
         assertHashEquals(rchildval, mt.hash(rchild));
-        assertNull(mt.hash(invalid));
     }
 
     @Test
@@ -294,9 +295,6 @@ public class MerkleTreeTest
         mt.split(tok(4));
         mt.split(tok(2));
         mt.split(tok(1));
-        assertNull(mt.hash(full));
-        assertNull(mt.hash(childfull));
-        assertNull(mt.hash(invalid));
 
         // validate the range
         mt.get(tok(1)).hash(val);
@@ -306,9 +304,12 @@ public class MerkleTreeTest
         mt.get(tok(16)).hash(val);
         mt.get(tok(-1)).hash(val);
 
+        assertTrue(mt.hashesRange(full));
+        assertTrue(mt.hashesRange(childfull));
+        assertFalse(mt.hashesRange(invalid));
+
         assertHashEquals(fullval, mt.hash(full));
         assertHashEquals(childfullval, mt.hash(childfull));
-        assertNull(mt.hash(invalid));
     }
 
     @Test
@@ -326,7 +327,7 @@ public class MerkleTreeTest
         }
 
         // validate the tree
-        TreeRangeIterator ranges = mt.invalids();
+        TreeRangeIterator ranges = mt.rangeIterator();
         for (TreeRange range : ranges)
             range.addHash(new RowHash(range.right, new byte[0], 0));
 
@@ -355,7 +356,7 @@ public class MerkleTreeTest
         mt.split(tok(6));
         mt.split(tok(10));
 
-        ranges = mt.invalids();
+        ranges = mt.rangeIterator();
         ranges.next().addAll(new HIterator(2, 4)); // (-1,4]: depth 2
         ranges.next().addAll(new HIterator(6)); // (4,6]
         ranges.next().addAll(new HIterator(8)); // (6,8]
@@ -372,7 +373,7 @@ public class MerkleTreeTest
         mt2.split(tok(9));
         mt2.split(tok(11));
 
-        ranges = mt2.invalids();
+        ranges = mt2.rangeIterator();
         ranges.next().addAll(new HIterator(2)); // (-1,2]
         ranges.next().addAll(new HIterator(4)); // (2,4]
         ranges.next().addAll(new HIterator(6, 8)); // (4,8]: depth 2
@@ -395,19 +396,33 @@ public class MerkleTreeTest
         // populate and validate the tree
         mt.maxsize(256);
         mt.init();
-        for (TreeRange range : mt.invalids())
+        for (TreeRange range : mt.rangeIterator())
             range.addAll(new HIterator(range.right));
 
         byte[] initialhash = mt.hash(full);
 
         DataOutputBuffer out = new DataOutputBuffer();
-        MerkleTree.serializer.serialize(mt, out, MessagingService.current_version);
+        mt.serialize(out, MessagingService.current_version);
         byte[] serialized = out.toByteArray();
 
-        DataInputPlus in = new DataInputBuffer(serialized);
-        MerkleTree restored = MerkleTree.serializer.deserialize(in, MessagingService.current_version);
+        MerkleTree restoredOnHeap =
+            MerkleTree.deserialize(new DataInputBuffer(serialized), false, MessagingService.current_version);
+        MerkleTree restoredOffHeap =
+            MerkleTree.deserialize(new DataInputBuffer(serialized), true, MessagingService.current_version);
+        MerkleTree movedOffHeap = mt.tryMoveOffHeap();
+
+        assertHashEquals(initialhash, restoredOnHeap.hash(full));
+        assertHashEquals(initialhash, restoredOffHeap.hash(full));
+        assertHashEquals(initialhash, movedOffHeap.hash(full));
+
+        assertEquals(mt, restoredOnHeap);
+        assertEquals(mt, restoredOffHeap);
+        assertEquals(mt, movedOffHeap);
+
+        assertEquals(restoredOnHeap, restoredOffHeap);
+        assertEquals(restoredOnHeap, movedOffHeap);
 
-        assertHashEquals(initialhash, restored.hash(full));
+        assertEquals(restoredOffHeap, movedOffHeap);
     }
 
     @Test
@@ -420,9 +435,9 @@ public class MerkleTreeTest
         mt2.init();
 
         // add dummy hashes to both trees
-        for (TreeRange range : mt.invalids())
+        for (TreeRange range : mt.rangeIterator())
             range.addAll(new HIterator(range.right));
-        for (TreeRange range : mt2.invalids())
+        for (TreeRange range : mt2.rangeIterator())
             range.addAll(new HIterator(range.right));
 
         TreeRange leftmost = null;
@@ -431,7 +446,7 @@ public class MerkleTreeTest
         mt.maxsize(maxsize + 2); // give some room for splitting
 
         // split the leftmost
-        Iterator<TreeRange> ranges = mt.invalids();
+        Iterator<TreeRange> ranges = mt.rangeIterator();
         leftmost = ranges.next();
         mt.split(leftmost.right);
 
@@ -465,18 +480,18 @@ public class MerkleTreeTest
         byte[] h2 = "hjkl".getBytes();
 
         // add dummy hashes to both trees
-        for (TreeRange tree : ltree.invalids())
+        for (TreeRange tree : ltree.rangeIterator())
         {
             tree.addHash(new RowHash(range.right, h1, h1.length));
         }
-        for (TreeRange tree : rtree.invalids())
+        for (TreeRange tree : rtree.rangeIterator())
         {
             tree.addHash(new RowHash(range.right, h2, h2.length));
         }
 
         List<TreeRange> diffs = MerkleTree.difference(ltree, rtree);
         assertEquals(Lists.newArrayList(range), diffs);
-        assertEquals(MerkleTree.FULLY_INCONSISTENT, MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new MerkleTree.TreeDifference(ltree.fullRange.left, ltree.fullRange.right, (byte) 0)));
+        assertEquals(MerkleTree.FULLY_INCONSISTENT, MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new MerkleTree.TreeRange(ltree.fullRange.left, ltree.fullRange.right, (byte)0)));
     }
 
     /**
@@ -499,11 +514,11 @@ public class MerkleTreeTest
 
 
         // add dummy hashes to both trees
-        for (TreeRange tree : ltree.invalids())
+        for (TreeRange tree : ltree.rangeIterator())
         {
             tree.addHash(new RowHash(range.right, h1, h1.length));
         }
-        for (TreeRange tree : rtree.invalids())
+        for (TreeRange tree : rtree.rangeIterator())
         {
             tree.addHash(new RowHash(range.right, h2, h2.length));
         }
@@ -533,7 +548,7 @@ public class MerkleTreeTest
             while (depth.equals(dstack.peek()))
             {
                 // consume the stack
-                hash = Hashable.binaryHash(hstack.pop(), hash);
+                hash = FBUtilities.xor(hstack.pop(), hash);
                 depth = dstack.pop() - 1;
             }
             dstack.push(depth);
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
index b40f6c4..a1f6068 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.MerkleTree.Hashable;
 import org.apache.cassandra.utils.MerkleTree.RowHash;
 import org.apache.cassandra.utils.MerkleTree.TreeRange;
 import org.apache.cassandra.utils.MerkleTrees.TreeRangeIterator;
@@ -43,7 +42,7 @@ import static org.junit.Assert.*;
 
 public class MerkleTreesTest
 {
-    public static byte[] DUMMY = "blah".getBytes();
+    private static final byte[] DUMMY = HashingUtils.newMessageDigest("SHA-256").digest("dummy".getBytes());
 
     /**
      * If a test assumes that the tree is 8 units wide, then it should set this value
@@ -193,7 +192,7 @@ public class MerkleTreesTest
         Iterator<TreeRange> ranges;
 
         // (zero, zero]
-        ranges = mts.invalids();
+        ranges = mts.rangeIterator();
         assertEquals(new Range<>(tok(-1), tok(-1)), ranges.next());
         assertFalse(ranges.hasNext());
 
@@ -203,7 +202,7 @@ public class MerkleTreesTest
         mts.split(tok(6));
         mts.split(tok(3));
         mts.split(tok(5));
-        ranges = mts.invalids();
+        ranges = mts.rangeIterator();
         assertEquals(new Range<>(tok(6), tok(-1)), ranges.next());
         assertEquals(new Range<>(tok(-1), tok(2)), ranges.next());
         assertEquals(new Range<>(tok(2), tok(3)), ranges.next());
@@ -245,11 +244,6 @@ public class MerkleTreesTest
         // (zero,two] (two,four] (four, zero]
         mts.split(tok(4));
         mts.split(tok(2));
-        assertNull(mts.hash(left));
-        assertNull(mts.hash(partial));
-        assertNull(mts.hash(right));
-        assertNull(mts.hash(linvalid));
-        assertNull(mts.hash(rinvalid));
 
         // validate the range
         mts.get(tok(2)).hash(val);
@@ -280,10 +274,6 @@ public class MerkleTreesTest
         mts.split(tok(2));
         mts.split(tok(6));
         mts.split(tok(1));
-        assertNull(mts.hash(full));
-        assertNull(mts.hash(lchild));
-        assertNull(mts.hash(rchild));
-        assertNull(mts.hash(invalid));
 
         // validate the range
         mts.get(tok(1)).hash(val);
@@ -315,9 +305,6 @@ public class MerkleTreesTest
         mts.split(tok(4));
         mts.split(tok(2));
         mts.split(tok(1));
-        assertNull(mts.hash(full));
-        assertNull(mts.hash(childfull));
-        assertNull(mts.hash(invalid));
 
         // validate the range
         mts.get(tok(1)).hash(val);
@@ -349,7 +336,7 @@ public class MerkleTreesTest
         }
 
         // validate the tree
-        TreeRangeIterator ranges = mts.invalids();
+        TreeRangeIterator ranges = mts.rangeIterator();
         for (TreeRange range : ranges)
             range.addHash(new RowHash(range.right, new byte[0], 0));
 
@@ -378,13 +365,16 @@ public class MerkleTreesTest
         mts.split(tok(6));
         mts.split(tok(10));
 
-        ranges = mts.invalids();
-        ranges.next().addAll(new HIterator(2, 4)); // (-1,4]: depth 2
-        ranges.next().addAll(new HIterator(6)); // (4,6]
-        ranges.next().addAll(new HIterator(8)); // (6,8]
+        int seed = 123456789;
+
+        Random random1 = new Random(seed);
+        ranges = mts.rangeIterator();
+        ranges.next().addAll(new HIterator(random1, 2, 4)); // (-1,4]: depth 2
+        ranges.next().addAll(new HIterator(random1, 6)); // (4,6]
+        ranges.next().addAll(new HIterator(random1, 8)); // (6,8]
         ranges.next().addAll(new HIterator(/*empty*/ new int[0])); // (8,10]
-        ranges.next().addAll(new HIterator(12)); // (10,12]
-        ranges.next().addAll(new HIterator(14, -1)); // (12,-1]: depth 2
+        ranges.next().addAll(new HIterator(random1, 12)); // (10,12]
+        ranges.next().addAll(new HIterator(random1, 14, -1)); // (12,-1]: depth 2
 
 
         mts2.split(tok(8));
@@ -395,15 +385,16 @@ public class MerkleTreesTest
         mts2.split(tok(9));
         mts2.split(tok(11));
 
-        ranges = mts2.invalids();
-        ranges.next().addAll(new HIterator(2)); // (-1,2]
-        ranges.next().addAll(new HIterator(4)); // (2,4]
-        ranges.next().addAll(new HIterator(6, 8)); // (4,8]: depth 2
+        Random random2 = new Random(seed);
+        ranges = mts2.rangeIterator();
+        ranges.next().addAll(new HIterator(random2, 2)); // (-1,2]
+        ranges.next().addAll(new HIterator(random2, 4)); // (2,4]
+        ranges.next().addAll(new HIterator(random2, 6, 8)); // (4,8]: depth 2
         ranges.next().addAll(new HIterator(/*empty*/ new int[0])); // (8,9]
         ranges.next().addAll(new HIterator(/*empty*/ new int[0])); // (9,10]
         ranges.next().addAll(new HIterator(/*empty*/ new int[0])); // (10,11]: depth 4
-        ranges.next().addAll(new HIterator(12)); // (11,12]: depth 4
-        ranges.next().addAll(new HIterator(14, -1)); // (12,-1]: depth 2
+        ranges.next().addAll(new HIterator(random2, 12)); // (11,12]: depth 4
+        ranges.next().addAll(new HIterator(random2, 14, -1)); // (12,-1]: depth 2
 
         byte[] mthash = mts.hash(full);
         byte[] mt2hash = mts2.hash(full);
@@ -425,7 +416,7 @@ public class MerkleTreesTest
 
         // populate and validate the tree
         mts.init();
-        for (TreeRange range : mts.invalids())
+        for (TreeRange range : mts.rangeIterator())
             range.addAll(new HIterator(range.right));
 
         byte[] initialhash = mts.hash(first);
@@ -456,11 +447,15 @@ public class MerkleTreesTest
         mts.init();
         mts2.init();
 
+        int seed = 123456789;
         // add dummy hashes to both trees
-        for (TreeRange range : mts.invalids())
-            range.addAll(new HIterator(range.right));
-        for (TreeRange range : mts2.invalids())
-            range.addAll(new HIterator(range.right));
+        Random random1 = new Random(seed);
+        for (TreeRange range : mts.rangeIterator())
+            range.addAll(new HIterator(random1, range.right));
+
+        Random random2 = new Random(seed);
+        for (TreeRange range : mts2.rangeIterator())
+            range.addAll(new HIterator(random2, range.right));
 
         TreeRange leftmost = null;
         TreeRange middle = null;
@@ -468,7 +463,7 @@ public class MerkleTreesTest
         mts.maxsize(fullRange(), maxsize + 2); // give some room for splitting
 
         // split the leftmost
-        Iterator<TreeRange> ranges = mts.invalids();
+        Iterator<TreeRange> ranges = mts.rangeIterator();
         leftmost = ranges.next();
         mts.split(leftmost.right);
 
@@ -504,7 +499,7 @@ public class MerkleTreesTest
             while (depth.equals(dstack.peek()))
             {
                 // consume the stack
-                hash = Hashable.binaryHash(hstack.pop(), hash);
+                hash = FBUtilities.xor(hstack.pop(), hash);
                 depth = dstack.pop()-1;
             }
             dstack.push(depth);
@@ -516,25 +511,47 @@ public class MerkleTreesTest
 
     public static class HIterator extends AbstractIterator<RowHash>
     {
-        private Iterator<Token> tokens;
+        private final Random random;
+        private final Iterator<Token> tokens;
 
-        public HIterator(int... tokens)
+        HIterator(int... tokens)
         {
-            List<Token> tlist = new LinkedList<Token>();
+            this(new Random(), tokens);
+        }
+
+        HIterator(Random random, int... tokens)
+        {
+            List<Token> tlist = new ArrayList<>(tokens.length);
             for (int token : tokens)
                 tlist.add(tok(token));
             this.tokens = tlist.iterator();
+            this.random = random;
         }
 
         public HIterator(Token... tokens)
         {
-            this.tokens = Arrays.asList(tokens).iterator();
+            this(new Random(), tokens);
+        }
+
+        HIterator(Random random, Token... tokens)
+        {
+            this(random, Arrays.asList(tokens).iterator());
+        }
+
+        private HIterator(Random random, Iterator<Token> tokens)
+        {
+            this.random = random;
+            this.tokens = tokens;
         }
 
         public RowHash computeNext()
         {
             if (tokens.hasNext())
-                return new RowHash(tokens.next(), DUMMY, DUMMY.length);
+            {
+                byte[] digest = new byte[32];
+                random.nextBytes(digest);
+                return new RowHash(tokens.next(), digest, 12345L);
+            }
             return endOfData();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 09/09: Marcus review feedback #2

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 26691276dd7b0b80f1a9d4bbf6f6030490f1a42f
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Fri Aug 2 14:18:18 2019 +0100

    Marcus review feedback #2
---
 src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
index c9b7ed7..f85c2eb 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
@@ -51,9 +51,11 @@ public class DifferenceHolder
                 TreeResponse r2 = trees.get(j);
                 hd.add(r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees));
             }
+            r1.trees.release();
             // and add them to the diff map
             diffBuilder.put(r1.endpoint, hd);
         }
+        trees.get(trees.size() - 1).trees.release();
         differences = diffBuilder.build();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 06/09: Consistent naming for offheap MT toggle param and methods

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9f56a149a782cf5d683973faa50baf3312c1bcaf
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Wed Jul 31 13:34:02 2019 +0100

    Consistent naming for offheap MT toggle param and methods
---
 src/java/org/apache/cassandra/config/Config.java               |  2 +-
 src/java/org/apache/cassandra/config/DatabaseDescriptor.java   | 10 +++++-----
 src/java/org/apache/cassandra/service/ActiveRepairService.java |  9 ++++-----
 .../org/apache/cassandra/service/ActiveRepairServiceMBean.java |  3 +++
 src/java/org/apache/cassandra/utils/MerkleTree.java            |  4 ++--
 test/unit/org/apache/cassandra/utils/MerkleTreeTest.java       |  2 +-
 6 files changed, 16 insertions(+), 14 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b56419d..a1fdfdc 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -130,7 +130,7 @@ public class Config
     public volatile Integer repair_session_max_tree_depth = null;
     public volatile Integer repair_session_space_in_mb = null;
 
-    public volatile boolean prefer_offheap_merkle_trees = true;
+    public volatile boolean use_offheap_merkle_trees = true;
 
     public int storage_port = 7000;
     public int ssl_storage_port = 7001;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 49761da..43203e5 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2908,14 +2908,14 @@ public class DatabaseDescriptor
         return strictRuntimeChecks;
     }
 
-    public static boolean getOffheapMerkleTreesEnabled()
+    public static boolean useOffheapMerkleTrees()
     {
-        return conf.prefer_offheap_merkle_trees;
+        return conf.use_offheap_merkle_trees;
     }
 
-    public static void setOffheapMerkleTreesEnabled(boolean value)
+    public static void useOffheapMerkleTrees(boolean value)
     {
-        logger.info("Setting prefer_offheap_merkle_trees to {}", value);
-        conf.prefer_offheap_merkle_trees = value;
+        logger.info("Setting use_offheap_merkle_trees to {}", value);
+        conf.use_offheap_merkle_trees = value;
     }
 }
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 689771c..409d799 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -71,7 +71,6 @@ import org.apache.cassandra.repair.consistent.LocalSessions;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
@@ -248,14 +247,14 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return session;
     }
 
-    public boolean getOffheapMerkleTreesEnabled()
+    public boolean getUseOffheapMerkleTrees()
     {
-        return DatabaseDescriptor.getOffheapMerkleTreesEnabled();
+        return DatabaseDescriptor.useOffheapMerkleTrees();
     }
 
-    public void setOffheapMerkleTreesEnabled(boolean enabled)
+    public void setUseOffheapMerkleTrees(boolean value)
     {
-        DatabaseDescriptor.setOffheapMerkleTreesEnabled(enabled);
+        DatabaseDescriptor.useOffheapMerkleTrees(value);
     }
 
     private <T extends AbstractFuture &
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index 283d466..d967280 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -30,4 +30,7 @@ public interface ActiveRepairServiceMBean
 
     public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes);
     public int getRepairSessionSpaceInMegabytes();
+
+    public boolean getUseOffheapMerkleTrees();
+    public void setUseOffheapMerkleTrees(boolean value);
 }
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 68be2d1..45d14a6 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -690,7 +690,7 @@ public class MerkleTree
 
     public static MerkleTree deserialize(DataInputPlus in, int version) throws IOException
     {
-        return deserialize(in, DatabaseDescriptor.getOffheapMerkleTreesEnabled(), version);
+        return deserialize(in, DatabaseDescriptor.useOffheapMerkleTrees(), version);
     }
 
     public static MerkleTree deserialize(DataInputPlus in, boolean offHeapRequested, int version) throws IOException
@@ -754,7 +754,7 @@ public class MerkleTree
      */
     MerkleTree tryMoveOffHeap() throws IOException
     {
-        return root instanceof OnHeapNode && shouldUseOffHeapTrees(partitioner, DatabaseDescriptor.getOffheapMerkleTreesEnabled())
+        return root instanceof OnHeapNode && shouldUseOffHeapTrees(partitioner, DatabaseDescriptor.useOffheapMerkleTrees())
              ? moveOffHeap()
              : this;
     }
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index 6ce2652..0ec6858 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -71,7 +71,7 @@ public class MerkleTreeTest
     public void setup()
     {
         DatabaseDescriptor.clientInitialization();
-        DatabaseDescriptor.setOffheapMerkleTreesEnabled(false);
+        DatabaseDescriptor.useOffheapMerkleTrees(false);
 
         TOKEN_SCALE = new BigInteger("8");
         partitioner = RandomPartitioner.instance;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 05/09: Some MT.difference() optimisations / more Marcus's feedback

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch 15202-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 3f5bb3ba9d732cb04c5b49cc08d9d64ac549bf73
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Mon Jul 29 17:01:22 2019 +0100

    Some MT.difference() optimisations / more Marcus's feedback
---
 .../org/apache/cassandra/utils/MerkleTree.java     | 36 +++++++++++++---------
 1 file changed, 21 insertions(+), 15 deletions(-)

diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index bdeaa3f..68be2d1 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -72,11 +72,11 @@ public class MerkleTree
 {
     private static final Logger logger = LoggerFactory.getLogger(MerkleTree.class);
 
-    private static final int HASH_SIZE = 32; // SHA-256 = 32 bytes.
+    private static final int HASH_SIZE = 32; // 2xMM3_128 = 32 bytes.
     private static final byte[] EMPTY_HASH = new byte[HASH_SIZE];
 
     /*
-     * Thread-local byte array, large enough to host SHA256 bytes or MM3/Random partitoners' tokens
+     * Thread-local byte array, large enough to host 32B of digest or MM3/Random partitoners' tokens
      */
     private static final ThreadLocal<byte[]> byteArray = ThreadLocal.withInitial(() -> new byte[HASH_SIZE]);
 
@@ -201,11 +201,15 @@ public class MerkleTree
         if (!ltree.fullRange.equals(rtree.fullRange))
             throw new IllegalArgumentException("Difference only make sense on tree covering the same range (but " + ltree.fullRange + " != " + rtree.fullRange + ')');
 
+        // ensure on-heap trees' inner node hashes have been computed
+        ltree.fillInnerHashes();
+        rtree.fillInnerHashes();
+
         List<TreeRange> diff = new ArrayList<>();
         TreeRange active = new TreeRange(ltree.fullRange.left, ltree.fullRange.right, 0);
 
-        Node lnode = ltree.find(active);
-        Node rnode = rtree.find(active);
+        Node lnode = ltree.root;
+        Node rnode = rtree.root;
 
         if (lnode.hashesDiffer(rnode))
         {
@@ -366,11 +370,12 @@ public class MerkleTree
             assert current instanceof Inner;
             Inner inner = (Inner) current;
 
-            Range<Token> leftRange  = new Range<>(activeRange.left, inner.token());
-            Range<Token> rightRange = new Range<>(inner.token(), activeRange.right);
-
             if (find.contains(activeRange)) // this node is fully contained in the range
-                return inner.compute();
+                return inner.fillInnerHashes();
+
+            Token midpoint = inner.token();
+            Range<Token>  leftRange = new Range<>(activeRange.left, midpoint);
+            Range<Token> rightRange = new Range<>(midpoint, activeRange.right);
 
             // else: one of our children contains the range
 
@@ -757,6 +762,7 @@ public class MerkleTree
     private MerkleTree moveOffHeap() throws IOException
     {
         assert root instanceof OnHeapNode;
+        root.fillInnerHashes(); // ensure on-heap trees' inner node hashes have been computed
         ByteBuffer buffer = allocate(Ints.checkedCast(size), partitioner);
         int pointer = ((OnHeapNode) root).serializeOffHeap(buffer, partitioner);
         OffHeapNode newRoot = fromPointer(pointer, buffer, partitioner);
@@ -790,7 +796,7 @@ public class MerkleTree
 
         boolean hashesDiffer(Node other);
 
-        default Node compute()
+        default Node fillInnerHashes()
         {
             return this;
         }
@@ -1265,12 +1271,12 @@ public class MerkleTree
         }
 
         @Override
-        public Node compute()
+        public Node fillInnerHashes()
         {
             if (!computed) // hash and size haven't been calculated; compute children then compute this
             {
-                left.compute();
-                right.compute();
+                left.fillInnerHashes();
+                right.fillInnerHashes();
 
                 if (!left.hasEmptyHash() && !right.hasEmptyHash())
                     hash = xor(left.hash(), right.hash());
@@ -1503,7 +1509,7 @@ public class MerkleTree
         OnHeapLeaf left = new OnHeapLeaf(hashLeft);
         OnHeapLeaf right = new OnHeapLeaf(hashRigth);
         Inner inner = new OnHeapInner(partitioner.getMinimumToken(), left, right);
-        inner.compute();
+        inner.fillInnerHashes();
 
         // Some partioners have variable token sizes, try to estimate as close as we can by using the same
         // heap estimate as the memtables use.
@@ -1635,8 +1641,8 @@ public class MerkleTree
     }
 
     @VisibleForTesting
-    void compute()
+    void fillInnerHashes()
     {
-        root.compute();
+        root.fillInnerHashes();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org