You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/01/23 10:18:19 UTC

[cassandra] branch trunk updated (e61c09e -> 9a28051)

This is an automated email from the ASF dual-hosted git repository.

samt pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from e61c09e  Fix potentially flaky ImportTest
     new 39eb7db  Exclude legacy counters from repaired data digest
     new 9a28051  Exclude purgeable tombstones from repaired data digest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   2 +
 .../cassandra/db/AbstractClusteringPrefix.java     |  10 +-
 .../org/apache/cassandra/db/ClusteringPrefix.java  |   8 +-
 src/java/org/apache/cassandra/db/Columns.java      |   6 +-
 src/java/org/apache/cassandra/db/DeletionTime.java |   6 +-
 src/java/org/apache/cassandra/db/Digest.java       | 200 ++++++++++++
 src/java/org/apache/cassandra/db/LivenessInfo.java |  17 +-
 .../cassandra/db/PartitionRangeReadCommand.java    |   2 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  | 164 ++++++++--
 src/java/org/apache/cassandra/db/ReadResponse.java |   8 +-
 .../cassandra/db/SinglePartitionReadCommand.java   |   2 +-
 .../cassandra/db/context/CounterContext.java       |  20 +-
 .../db/partitions/PartitionIterators.java          |   1 -
 .../cassandra/db/partitions/PurgeFunction.java     |   7 +-
 .../partitions/UnfilteredPartitionIterators.java   |   8 +-
 .../org/apache/cassandra/db/rows/AbstractCell.java |  24 +-
 .../org/apache/cassandra/db/rows/AbstractRow.java  |  15 +-
 .../org/apache/cassandra/db/rows/CellPath.java     |   8 +-
 .../org/apache/cassandra/db/rows/ColumnData.java   |   7 +-
 .../cassandra/db/rows/ComplexColumnData.java       |   8 +-
 .../db/rows/RangeTombstoneBoundMarker.java         |   8 +-
 .../db/rows/RangeTombstoneBoundaryMarker.java      |  10 +-
 src/java/org/apache/cassandra/db/rows/Row.java     |   8 +-
 .../org/apache/cassandra/db/rows/RowIterators.java |  17 +-
 .../org/apache/cassandra/db/rows/Unfiltered.java   |   9 +-
 .../cassandra/db/rows/UnfilteredRowIterators.java  |  20 +-
 .../apache/cassandra/dht/RandomPartitioner.java    |   3 +-
 .../org/apache/cassandra/repair/Validator.java     | 165 +---------
 .../apache/cassandra/schema/SchemaConstants.java   |   4 +-
 .../apache/cassandra/schema/SchemaKeyspace.java    |   9 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |  14 +
 .../org/apache/cassandra/utils/HashingUtils.java   | 109 -------
 src/java/org/apache/cassandra/utils/MD5Digest.java |   2 +-
 src/java/org/apache/cassandra/utils/UUIDGen.java   |  24 +-
 .../distributed/test/RepairDigestTrackingTest.java |  54 ++++
 .../apache/cassandra/cache/CacheProviderTest.java  |  15 +-
 .../org/apache/cassandra/db/CounterCellTest.java   |  17 +-
 test/unit/org/apache/cassandra/db/DigestTest.java  | 122 +++++++
 .../org/apache/cassandra/db/PartitionTest.java     |  37 ++-
 .../org/apache/cassandra/db/ReadCommandTest.java   | 354 ++++++++++++++++++++-
 .../org/apache/cassandra/db/RowUpdateBuilder.java  |  10 +
 .../org/apache/cassandra/repair/ValidatorTest.java |  47 +--
 .../repair/asymmetric/DifferenceHolderTest.java    |   6 +-
 .../apache/cassandra/utils/HashingUtilsTest.java   |  92 ------
 .../org/apache/cassandra/utils/MerkleTreeTest.java |   7 +-
 .../apache/cassandra/utils/MerkleTreesTest.java    |   6 +-
 46 files changed, 1063 insertions(+), 629 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/db/Digest.java
 delete mode 100644 src/java/org/apache/cassandra/utils/HashingUtils.java
 create mode 100644 test/unit/org/apache/cassandra/db/DigestTest.java
 delete mode 100644 test/unit/org/apache/cassandra/utils/HashingUtilsTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Exclude legacy counters from repaired data digest

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 39eb7db65fd45653fdece1087ba75c3356a10c97
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Dec 17 17:56:18 2019 +0000

    Exclude legacy counters from repaired data digest
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-15461
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/AbstractClusteringPrefix.java     |  10 +-
 .../org/apache/cassandra/db/ClusteringPrefix.java  |   8 +-
 src/java/org/apache/cassandra/db/Columns.java      |   6 +-
 src/java/org/apache/cassandra/db/DeletionTime.java |   6 +-
 src/java/org/apache/cassandra/db/Digest.java       | 200 +++++++++++++++++++++
 src/java/org/apache/cassandra/db/LivenessInfo.java |  17 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  |  14 +-
 src/java/org/apache/cassandra/db/ReadResponse.java |   8 +-
 .../cassandra/db/context/CounterContext.java       |  20 +--
 .../db/partitions/PartitionIterators.java          |   1 -
 .../partitions/UnfilteredPartitionIterators.java   |   8 +-
 .../org/apache/cassandra/db/rows/AbstractCell.java |  24 +--
 .../org/apache/cassandra/db/rows/AbstractRow.java  |  15 +-
 .../org/apache/cassandra/db/rows/CellPath.java     |   8 +-
 .../org/apache/cassandra/db/rows/ColumnData.java   |   7 +-
 .../cassandra/db/rows/ComplexColumnData.java       |   8 +-
 .../db/rows/RangeTombstoneBoundMarker.java         |   8 +-
 .../db/rows/RangeTombstoneBoundaryMarker.java      |  10 +-
 src/java/org/apache/cassandra/db/rows/Row.java     |   8 +-
 .../org/apache/cassandra/db/rows/RowIterators.java |  17 +-
 .../org/apache/cassandra/db/rows/Unfiltered.java   |   9 +-
 .../cassandra/db/rows/UnfilteredRowIterators.java  |  20 +--
 .../apache/cassandra/dht/RandomPartitioner.java    |   3 +-
 .../org/apache/cassandra/repair/Validator.java     | 165 +----------------
 .../apache/cassandra/schema/SchemaConstants.java   |   4 +-
 .../apache/cassandra/schema/SchemaKeyspace.java    |   9 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |  14 ++
 .../org/apache/cassandra/utils/HashingUtils.java   | 109 -----------
 src/java/org/apache/cassandra/utils/MD5Digest.java |   2 +-
 src/java/org/apache/cassandra/utils/UUIDGen.java   |  24 ++-
 .../apache/cassandra/cache/CacheProviderTest.java  |  15 +-
 .../org/apache/cassandra/db/CounterCellTest.java   |  17 +-
 test/unit/org/apache/cassandra/db/DigestTest.java  | 122 +++++++++++++
 .../org/apache/cassandra/db/PartitionTest.java     |  37 ++--
 .../org/apache/cassandra/db/ReadCommandTest.java   |  88 ++++++++-
 .../org/apache/cassandra/db/RowUpdateBuilder.java  |  10 ++
 .../org/apache/cassandra/repair/ValidatorTest.java |  47 +----
 .../repair/asymmetric/DifferenceHolderTest.java    |   6 +-
 .../apache/cassandra/utils/HashingUtilsTest.java   |  92 ----------
 .../org/apache/cassandra/utils/MerkleTreeTest.java |   7 +-
 .../apache/cassandra/utils/MerkleTreesTest.java    |   6 +-
 42 files changed, 598 insertions(+), 612 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 85ddd53..3d5217a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Exclude legacy counter shards from repaired data tracking (CASSANDRA-15461)
  * Make it easier to add trace headers to messages (CASSANDRA-15499)
  * Fix and optimise partial compressed sstable streaming (CASSANDRA-13938)
  * Improve error when JVM 11 can't access required modules (CASSANDRA-15468)
diff --git a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
index 884a091..8714936 100644
--- a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
@@ -20,10 +20,6 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-import com.google.common.hash.Hasher;
-
-import org.apache.cassandra.utils.HashingUtils;
-
 public abstract class AbstractClusteringPrefix implements ClusteringPrefix
 {
     public ClusteringPrefix clustering()
@@ -42,15 +38,15 @@ public abstract class AbstractClusteringPrefix implements ClusteringPrefix
         return size;
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
         for (int i = 0; i < size(); i++)
         {
             ByteBuffer bb = get(i);
             if (bb != null)
-                HashingUtils.updateBytes(hasher, bb.duplicate());
+                digest.update(bb);
         }
-        HashingUtils.updateWithByte(hasher, kind().ordinal());
+        digest.updateWithByte(kind().ordinal());
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 2d9198b..357d746 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.hash.Hasher;
-
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.marshal.CompositeType;
@@ -218,11 +216,11 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
     public ByteBuffer get(int i);
 
     /**
-     * Adds the data of this clustering prefix to the provided Hasher instance.
+     * Adds the data of this clustering prefix to the provided Digest instance.
      *
-     * @param hasher the Hasher instance to which to add this prefix.
+     * @param digest the Digest instance to which to add this prefix.
      */
-    public void digest(Hasher hasher);
+    public void digest(Digest digest);
 
     /**
      * The size of the data hold by this prefix.
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index bf9e174..f56072e 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
-import com.google.common.hash.Hasher;
 
 import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.exceptions.UnknownColumnException;
@@ -37,7 +36,6 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSearchIterator;
@@ -375,10 +373,10 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle
         return column -> iter.next(column) != null;
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
         for (ColumnMetadata c : this)
-            HashingUtils.updateBytes(hasher, c.name.bytes.duplicate());
+            digest.update(c.name.bytes);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 14e846d..b2d9343 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -20,14 +20,12 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 
 import com.google.common.base.Objects;
-import com.google.common.hash.Hasher;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.ObjectSizes;
 
 /**
@@ -80,12 +78,12 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
         return markedForDeleteAt() == Long.MIN_VALUE && localDeletionTime() == Integer.MAX_VALUE;
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
         // localDeletionTime is basically a metadata of the deletion time that tells us when it's ok to purge it.
         // It's thus intrinsically a local information and shouldn't be part of the digest (which exists for
         // cross-nodes comparisons).
-        HashingUtils.updateWithLong(hasher, markedForDeleteAt());
+        digest.updateWithLong(markedForDeleteAt());
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/Digest.java b/src/java/org/apache/cassandra/db/Digest.java
new file mode 100644
index 0000000..bac6386
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Digest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.utils.FastByteOperations;
+
+public class Digest
+{
+    private static final ThreadLocal<byte[]> localBuffer = ThreadLocal.withInitial(() -> new byte[4096]);
+
+    private final Hasher hasher;
+    private long inputBytes = 0;
+
+    @SuppressWarnings("deprecation")
+    private static Hasher md5()
+    {
+        return Hashing.md5().newHasher();
+    }
+
+    public static Digest forReadResponse()
+    {
+        return new Digest(md5());
+    }
+
+    public static Digest forSchema()
+    {
+        return new Digest(md5());
+    }
+
+    public static Digest forValidator()
+    {
+        // Uses a Hasher that concatenates the hash code from 2 hash functions
+        // (murmur3_128) with different seeds to produce a 256 bit hashcode
+        return new Digest(Hashing.concatenating(Hashing.murmur3_128(1000),
+                                                Hashing.murmur3_128(2000))
+                                 .newHasher());
+    }
+
+    public static Digest forRepairedDataTracking()
+    {
+        return new Digest(Hashing.crc32c().newHasher())
+        {
+            @Override
+            public Digest updateWithCounterContext(ByteBuffer context)
+            {
+                // for the purposes of repaired data tracking on the read path, exclude
+                // contexts with legacy shards as these may be irrevocably different on
+                // different replicas
+                if (CounterContext.instance().hasLegacyShards(context))
+                    return this;
+
+                return super.updateWithCounterContext(context);
+            }
+        };
+    }
+
+    Digest(Hasher hasher)
+    {
+        this.hasher = hasher;
+    }
+
+    public Digest update(byte[] input, int offset, int len)
+    {
+        hasher.putBytes(input, offset, len);
+        inputBytes += len;
+        return this;
+    }
+
+    /**
+     * Update the digest with the bytes from the supplied buffer. This does
+     * not modify the position of the supplied buffer, so callers are not
+     * required to duplicate() the source buffer before calling
+     */
+    public Digest update(ByteBuffer input)
+    {
+        return update(input, input.position(), input.remaining());
+    }
+
+    /**
+     * Update the digest with the bytes sliced from the supplied buffer. This does
+     * not modify the position of the supplied buffer, so callers are not
+     * required to duplicate() the source buffer before calling
+     */
+    private Digest update(ByteBuffer input, int pos, int len)
+    {
+        if (len <= 0)
+            return this;
+
+        if (input.hasArray())
+        {
+            byte[] b = input.array();
+            int ofs = input.arrayOffset();
+            hasher.putBytes(b, ofs + pos, len);
+            inputBytes += len;
+        }
+        else
+        {
+            byte[] tempArray = localBuffer.get();
+            while (len > 0)
+            {
+                int chunk = Math.min(len, tempArray.length);
+                FastByteOperations.copy(input, pos, tempArray, 0, chunk);
+                hasher.putBytes(tempArray, 0, chunk);
+                len -= chunk;
+                pos += chunk;
+                inputBytes += chunk;
+            }
+        }
+        return this;
+    }
+
+    /**
+     * Update the digest with the content of a counter context.
+     * Note that this skips the header entirely since the header information
+     * has local meaning only, while digests are meant for comparison across
+     * nodes. This means in particular that we always have:
+     *  updateDigest(ctx) == updateDigest(clearAllLocal(ctx))
+     */
+    public Digest updateWithCounterContext(ByteBuffer context)
+    {
+        // context can be empty due to the optimization from CASSANDRA-10657
+        if (!context.hasRemaining())
+            return this;
+
+        int pos = context.position() + CounterContext.headerLength(context);
+        int len = context.limit() - pos;
+        update(context, pos, len);
+        return this;
+    }
+
+    public Digest updateWithByte(int val)
+    {
+        hasher.putByte((byte) (val & 0xFF));
+        inputBytes++;
+        return this;
+    }
+
+    public Digest updateWithInt(int val)
+    {
+        hasher.putByte((byte) ((val >>> 24) & 0xFF));
+        hasher.putByte((byte) ((val >>> 16) & 0xFF));
+        hasher.putByte((byte) ((val >>>  8) & 0xFF));
+        hasher.putByte((byte) ((val >>> 0) & 0xFF));
+        inputBytes += 4;
+        return this;
+    }
+
+    public Digest updateWithLong(long val)
+    {
+        hasher.putByte((byte) ((val >>> 56) & 0xFF));
+        hasher.putByte((byte) ((val >>> 48) & 0xFF));
+        hasher.putByte((byte) ((val >>> 40) & 0xFF));
+        hasher.putByte((byte) ((val >>> 32) & 0xFF));
+        hasher.putByte((byte) ((val >>> 24) & 0xFF));
+        hasher.putByte((byte) ((val >>> 16) & 0xFF));
+        hasher.putByte((byte) ((val >>>  8) & 0xFF));
+        hasher.putByte((byte)  ((val >>> 0) & 0xFF));
+        inputBytes += 8;
+        return this;
+    }
+
+    public Digest updateWithBoolean(boolean val)
+    {
+        updateWithByte(val ? 0 : 1);
+        return this;
+    }
+
+    public byte[] digest()
+    {
+        return hasher.hash().asBytes();
+    }
+
+    public long inputBytes()
+    {
+        return inputBytes;
+    }
+}
+
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java
index 1340c00..b1ea3f6 100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -19,11 +19,8 @@ package org.apache.cassandra.db;
 
 import java.util.Objects;
 
-import com.google.common.hash.Hasher;
-
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.HashingUtils;
 
 /**
  * Stores the information relating to the liveness of the primary key columns of a row.
@@ -153,11 +150,11 @@ public class LivenessInfo
     /**
      * Adds this liveness information to the provided digest.
      *
-     * @param hasher the hasher digest to add this liveness information to.
+     * @param digest the digest to add this liveness information to.
      */
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
-        HashingUtils.updateWithLong(hasher, timestamp());
+        digest.updateWithLong(timestamp());
     }
 
     /**
@@ -331,11 +328,11 @@ public class LivenessInfo
         }
 
         @Override
-        public void digest(Hasher hasher)
+        public void digest(Digest digest)
         {
-            super.digest(hasher);
-            HashingUtils.updateWithInt(hasher, localExpirationTime);
-            HashingUtils.updateWithInt(hasher, ttl);
+            super.digest(digest);
+            digest.updateWithInt(localExpirationTime)
+                  .updateWithInt(ttl);
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 68ce2ea..9485abc 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -29,8 +29,6 @@ import javax.annotation.Nullable;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +36,6 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.net.MessageFlag;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.utils.ApproximateTime;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.RTBoundCloser;
@@ -67,7 +64,6 @@ import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HashingUtils;
 
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.filter;
@@ -782,14 +778,14 @@ public abstract class ReadCommand extends AbstractReadQuery
 
     private static class RepairedDataInfo
     {
-        private Hasher hasher;
+        private Digest hasher;
         private boolean isConclusive = true;
 
         ByteBuffer getDigest()
         {
             return hasher == null
                    ? ByteBufferUtil.EMPTY_BYTE_BUFFER
-                   : ByteBuffer.wrap(getHasher().hash().asBytes());
+                   : ByteBuffer.wrap(getHasher().digest());
         }
 
         boolean isConclusive()
@@ -804,7 +800,7 @@ public abstract class ReadCommand extends AbstractReadQuery
 
         void trackPartitionKey(DecoratedKey key)
         {
-            HashingUtils.updateBytes(getHasher(), key.getKey().duplicate());
+            getHasher().update(key.getKey());
         }
 
         void trackDeletion(DeletionTime deletion)
@@ -822,10 +818,10 @@ public abstract class ReadCommand extends AbstractReadQuery
             row.digest(getHasher());
         }
 
-        private Hasher getHasher()
+        private Digest getHasher()
         {
             if (hasher == null)
-                hasher = Hashing.crc32c().newHasher();
+                hasher = Digest.forRepairedDataTracking();
 
             return hasher;
         }
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 2ddb6a7..affbbbe 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -21,7 +21,6 @@ import java.io.*;
 import java.nio.ByteBuffer;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.hash.Hasher;
 
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.*;
@@ -34,7 +33,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HashingUtils;
 
 public abstract class ReadResponse
 {
@@ -124,9 +122,9 @@ public abstract class ReadResponse
 
     protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command)
     {
-        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        UnfilteredPartitionIterators.digest(iterator, hasher, command.digestVersion());
-        return ByteBuffer.wrap(hasher.hash().asBytes());
+        Digest digest = Digest.forReadResponse();
+        UnfilteredPartitionIterators.digest(iterator, digest, command.digestVersion());
+        return ByteBuffer.wrap(digest.digest());
     }
 
     private static class DigestResponse extends ReadResponse
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index 01c2f1d..6a618ca 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.hash.Hasher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -168,7 +167,7 @@ public class CounterContext
         return state.context;
     }
 
-    private static int headerLength(ByteBuffer context)
+    public static int headerLength(ByteBuffer context)
     {
         return HEADER_SIZE_LENGTH + Math.abs(context.getShort(context.position())) * HEADER_ELT_LENGTH;
     }
@@ -684,23 +683,6 @@ public class CounterContext
     }
 
     /**
-     * Update a {@link Hasher} with the content of a context.
-     * Note that this skips the header entirely since the header information
-     * has local meaning only, while digests are meant for comparison across
-     * nodes. This means in particular that we always have:
-     *  updateDigest(ctx) == updateDigest(clearAllLocal(ctx))
-     */
-    public void updateDigest(Hasher hasher, ByteBuffer context)
-    {
-        // context can be empty due to the optimization from CASSANDRA-10657
-        if (!context.hasRemaining())
-            return;
-        ByteBuffer dup = context.duplicate();
-        dup.position(context.position() + headerLength(context));
-        HashingUtils.updateBytes(hasher, dup);
-    }
-
-    /**
      * Returns the clock and the count associated with the local counter id, or (0, 0) if no such shard is present.
      */
     public ClockAndCount getLocalClockAndCount(ByteBuffer context)
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 74a61d6..874db05 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.partitions;
 import java.util.*;
 
 import org.apache.cassandra.db.EmptyIterators;
-import org.apache.cassandra.db.transform.FilteredPartitions;
 import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.utils.AbstractIterator;
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 7990474..945bcb4 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -21,8 +21,6 @@ import java.io.IOError;
 import java.io.IOException;
 import java.util.*;
 
-import com.google.common.hash.Hasher;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
@@ -259,16 +257,16 @@ public abstract class UnfilteredPartitionIterators
      * Caller must close the provided iterator.
      *
      * @param iterator the iterator to digest.
-     * @param hasher the {@link Hasher} to use for the digest.
+     * @param digest the {@link Digest} to use.
      * @param version the messaging protocol to use when producing the digest.
      */
-    public static void digest(UnfilteredPartitionIterator iterator, Hasher hasher, int version)
+    public static void digest(UnfilteredPartitionIterator iterator, Digest digest, int version)
     {
         while (iterator.hasNext())
         {
             try (UnfilteredRowIterator partition = iterator.next())
             {
-                    UnfilteredRowIterators.digest(partition, hasher, version);
+                UnfilteredRowIterators.digest(partition, digest, version);
             }
         }
     }
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index bfe7396..51c9ff4 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -20,16 +20,14 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-import com.google.common.hash.Hasher;
-
-import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
@@ -120,22 +118,18 @@ public abstract class AbstractCell extends Cell
                + (path == null ? 0 : path.dataSize());
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
         if (isCounterCell())
-        {
-            CounterContext.instance().updateDigest(hasher, value());
-        }
+            digest.updateWithCounterContext(value());
         else
-        {
-            HashingUtils.updateBytes(hasher, value().duplicate());
-        }
+            digest.update(value());
 
-        HashingUtils.updateWithLong(hasher, timestamp());
-        HashingUtils.updateWithInt(hasher, ttl());
-        HashingUtils.updateWithBoolean(hasher, isCounterCell());
+        digest.updateWithLong(timestamp())
+              .updateWithInt(ttl())
+              .updateWithBoolean(isCounterCell());
         if (path() != null)
-            path().digest(hasher);
+            path().digest(digest);
     }
 
     public void validate()
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index f719db5..2018d4e 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -17,21 +17,18 @@
 package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
-import java.util.AbstractCollection;
 import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import com.google.common.collect.Iterables;
-import com.google.common.hash.Hasher;
 
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.HashingUtils;
 
 /**
  * Base abstract class for {@code Row} implementations.
@@ -61,16 +58,16 @@ public abstract class AbstractRow implements Row
         return clustering() == Clustering.STATIC_CLUSTERING;
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
-        HashingUtils.updateWithByte(hasher, kind().ordinal());
-        clustering().digest(hasher);
+        digest.updateWithByte(kind().ordinal());
+        clustering().digest(digest);
 
-        deletion().digest(hasher);
-        primaryKeyLivenessInfo().digest(hasher);
+        deletion().digest(digest);
+        primaryKeyLivenessInfo().digest(digest);
 
         for (ColumnData cd : this)
-            cd.digest(hasher);
+            cd.digest(digest);
     }
 
     public void validateData(TableMetadata metadata)
diff --git a/src/java/org/apache/cassandra/db/rows/CellPath.java b/src/java/org/apache/cassandra/db/rows/CellPath.java
index 94fa8e7..1bf8b8f 100644
--- a/src/java/org/apache/cassandra/db/rows/CellPath.java
+++ b/src/java/org/apache/cassandra/db/rows/CellPath.java
@@ -21,12 +21,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-import com.google.common.hash.Hasher;
-
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -56,10 +54,10 @@ public abstract class CellPath
         return size;
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
         for (int i = 0; i < size(); i++)
-            HashingUtils.updateBytes(hasher, get(i).duplicate());
+            digest.update(get(i));
     }
 
     public abstract CellPath copy(AbstractAllocator allocator);
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index f2da132..e5f5550 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -19,8 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import java.util.Comparator;
 
-import com.google.common.hash.Hasher;
-
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -75,9 +74,9 @@ public abstract class ColumnData
     /**
      * Adds the data to the provided digest.
      *
-     * @param hasher the {@link Hasher} to add the data to.
+     * @param digest the {@link Digest} to add the data to.
      */
-    public abstract void digest(Hasher hasher);
+    public abstract void digest(Digest digest);
 
     /**
      * Returns a copy of the data where all timestamps for live data have replaced by {@code newTimestamp} and
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index aa1150c..832167f 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -22,10 +22,10 @@ import java.util.Iterator;
 import java.util.Objects;
 
 import com.google.common.base.Function;
-import com.google.common.hash.Hasher;
 
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.LivenessInfo;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.ByteType;
@@ -129,13 +129,13 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
             cell.validate();
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
         if (!complexDeletion.isLive())
-            complexDeletion.digest(hasher);
+            complexDeletion.digest(digest);
 
         for (Cell cell : this)
-            cell.digest(hasher);
+            cell.digest(digest);
     }
 
     public boolean hasInvalidDeletions()
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index 094cf72..51d8264 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-import com.google.common.hash.Hasher;
-
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
@@ -132,10 +130,10 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
         return new RangeTombstoneBoundMarker(clustering(), newDeletionTime);
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
-        bound.digest(hasher);
-        deletion.digest(hasher);
+        bound.digest(digest);
+        deletion.digest(digest);
     }
 
     public String toString(TableMetadata metadata)
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 1cbf31c..6a931c9 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-import com.google.common.hash.Hasher;
-
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
@@ -150,11 +148,11 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
         return new RangeTombstoneBoundMarker(openBound(reversed), openDeletionTime(reversed));
     }
 
-    public void digest(Hasher hasher)
+    public void digest(Digest digest)
     {
-        bound.digest(hasher);
-        endDeletion.digest(hasher);
-        startDeletion.digest(hasher);
+        bound.digest(digest);
+        endDeletion.digest(digest);
+        startDeletion.digest(digest);
     }
 
     public String toString(TableMetadata metadata)
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 0174adc..6f0b43e 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -21,14 +21,12 @@ import java.util.*;
 import java.util.function.Consumer;
 
 import com.google.common.base.Predicate;
-import com.google.common.hash.Hasher;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Commit;
-import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
@@ -391,10 +389,10 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
             return time.deletes(cell);
         }
 
-        public void digest(Hasher hasher)
+        public void digest(Digest digest)
         {
-            time.digest(hasher);
-            HashingUtils.updateWithBoolean(hasher, isShadowable);
+            time.digest(digest);
+            digest.updateWithBoolean(isShadowable);
         }
 
         public int dataSize()
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index d340777..640cbc8 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -17,14 +17,13 @@
  */
 package org.apache.cassandra.db.rows;
 
-import com.google.common.hash.Hasher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.HashingUtils;
 
 /**
  * Static methods to work with row iterators.
@@ -35,20 +34,20 @@ public abstract class RowIterators
 
     private RowIterators() {}
 
-    public static void digest(RowIterator iterator, Hasher hasher)
+    public static void digest(RowIterator iterator, Digest digest)
     {
         // TODO: we're not computing digest the same way that old nodes. This is
         // currently ok as this is only used for schema digest and the is no exchange
         // of schema digest between different versions. If this changes however,
         // we'll need to agree on a version.
-        HashingUtils.updateBytes(hasher, iterator.partitionKey().getKey().duplicate());
-        iterator.columns().regulars.digest(hasher);
-        iterator.columns().statics.digest(hasher);
-        HashingUtils.updateWithBoolean(hasher, iterator.isReverseOrder());
-        iterator.staticRow().digest(hasher);
+        digest.update(iterator.partitionKey().getKey());
+        iterator.columns().regulars.digest(digest);
+        iterator.columns().statics.digest(digest);
+        digest.updateWithBoolean(iterator.isReverseOrder());
+        iterator.staticRow().digest(digest);
 
         while (iterator.hasNext())
-            iterator.next().digest(hasher);
+            iterator.next().digest(digest);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index 81b63b7..f5c5ed0 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -17,8 +17,7 @@
  */
 package org.apache.cassandra.db.rows;
 
-import com.google.common.hash.Hasher;
-
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.Clusterable;
 
@@ -39,11 +38,11 @@ public interface Unfiltered extends Clusterable
     public Kind kind();
 
     /**
-     * Digest the atom using the provided {@link Hasher}.
+     * Digest the atom using the provided {@link Digest}.
      *
-     * @param hasher the {@see Hasher} to use.
+     * @param digest the {@see Digest} to use.
      */
-    public void digest(Hasher hasher);
+    public void digest(Digest digest);
 
     /**
      * Validate the data of this atom.
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 21e1954..b0af16d 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.rows;
 
 import java.util.*;
 
-import com.google.common.hash.Hasher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +32,6 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.IMergeIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
@@ -181,14 +179,14 @@ public abstract class UnfilteredRowIterators
      * Digests the partition represented by the provided iterator.
      *
      * @param iterator the iterator to digest.
-     * @param hasher the {@link Hasher} to use for the digest.
+     * @param digest the {@link Digest} to use.
      * @param version the messaging protocol to use when producing the digest.
      */
-    public static void digest(UnfilteredRowIterator iterator, Hasher hasher, int version)
+    public static void digest(UnfilteredRowIterator iterator, Digest digest, int version)
     {
-        HashingUtils.updateBytes(hasher, iterator.partitionKey().getKey().duplicate());
-        iterator.partitionLevelDeletion().digest(hasher);
-        iterator.columns().regulars.digest(hasher);
+        digest.update(iterator.partitionKey().getKey());
+        iterator.partitionLevelDeletion().digest(digest);
+        iterator.columns().regulars.digest(digest);
         // When serializing an iterator, we skip the static columns if the iterator has not static row, even if the
         // columns() object itself has some (the columns() is a superset of what the iterator actually contains, and
         // will correspond to the queried columns pre-serialization). So we must avoid taking the satic column names
@@ -200,14 +198,14 @@ public abstract class UnfilteredRowIterators
         // different), but removing them entirely is stricly speaking a breaking change (it would create mismatches on
         // upgrade) so we can only do on the next protocol version bump.
         if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
-            iterator.columns().statics.digest(hasher);
-        HashingUtils.updateWithBoolean(hasher, iterator.isReverseOrder());
-        iterator.staticRow().digest(hasher);
+            iterator.columns().statics.digest(digest);
+        digest.updateWithBoolean(iterator.isReverseOrder());
+        iterator.staticRow().digest(digest);
 
         while (iterator.hasNext())
         {
             Unfiltered unfiltered = iterator.next();
-            unfiltered.digest(hasher);
+            unfiltered.digest(digest);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 0457a89..241b785 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.GuidGenerator;
-import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.Pair;
 
@@ -61,7 +60,7 @@ public class RandomPartitioner implements IPartitioner
         @Override
         protected MessageDigest initialValue()
         {
-            return HashingUtils.newMessageDigest("MD5");
+            return FBUtilities.newMessageDigest("MD5");
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 341c5b3..2f71729 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.Range;
@@ -195,171 +196,15 @@ public class Validator implements Runnable
         return range.contains(t);
     }
 
-    /**
-     * Hasher that concatenates the hash code from 2 hash functions (murmur3_128) with different
-     * seeds and counts the number of bytes we hashed.
-     *
-     * Everything hashed by this class is hashed by both hash functions and the
-     * resulting hashcode is a concatenation of the output bytes from each.
-     *
-     * Idea from Guavas Hashing.ConcatenatedHashFunction, but that is package-private so we can't use it
-     */
-    @VisibleForTesting
-    static class CountingHasher implements Hasher
-    {
-        @VisibleForTesting
-        static final HashFunction[] hashFunctions = new HashFunction[2];
-
-        static
-        {
-            for (int i = 0; i < hashFunctions.length; i++)
-                hashFunctions[i] = Hashing.murmur3_128(i * 1000);
-        }
-        private long count;
-        private final int bits;
-        private final Hasher[] underlying = new Hasher[2];
-
-        CountingHasher()
-        {
-            int bits = 0;
-            for (int i = 0; i < underlying.length; i++)
-            {
-                this.underlying[i] = hashFunctions[i].newHasher();
-                bits += hashFunctions[i].bits();
-            }
-            this.bits = bits;
-        }
-
-        public Hasher putByte(byte b)
-        {
-            count += 1;
-            for (Hasher h : underlying)
-                h.putByte(b);
-            return this;
-        }
-
-        public Hasher putBytes(byte[] bytes)
-        {
-            count += bytes.length;
-            for (Hasher h : underlying)
-                h.putBytes(bytes);
-            return this;
-        }
-
-        public Hasher putBytes(byte[] bytes, int offset, int length)
-        {
-            count += length;
-            for (Hasher h : underlying)
-                h.putBytes(bytes, offset, length);
-            return this;
-        }
-
-        public Hasher putBytes(ByteBuffer byteBuffer)
-        {
-            count += byteBuffer.remaining();
-            for (Hasher h : underlying)
-                h.putBytes(byteBuffer.duplicate());
-            return this;
-        }
-
-        public Hasher putShort(short i)
-        {
-            count += Short.BYTES;
-            for (Hasher h : underlying)
-                h.putShort(i);
-            return this;
-        }
-
-        public Hasher putInt(int i)
-        {
-            count += Integer.BYTES;
-            for (Hasher h : underlying)
-                h.putInt(i);
-            return this;
-        }
-
-        public Hasher putLong(long l)
-        {
-            count += Long.BYTES;
-            for (Hasher h : underlying)
-                h.putLong(l);
-            return this;
-        }
-
-        public Hasher putFloat(float v)
-        {
-            count += Float.BYTES;
-            for (Hasher h : underlying)
-                h.putFloat(v);
-            return this;
-        }
-
-        public Hasher putDouble(double v)
-        {
-            count += Double.BYTES;
-            for (Hasher h : underlying)
-                h.putDouble(v);
-            return this;
-        }
-
-        public Hasher putBoolean(boolean b)
-        {
-            count += Byte.BYTES;
-            for (Hasher h : underlying)
-                h.putBoolean(b);
-            return this;
-        }
-
-        public Hasher putChar(char c)
-        {
-            count += Character.BYTES;
-            for (Hasher h : underlying)
-                h.putChar(c);
-            return this;
-        }
-
-        public Hasher putUnencodedChars(CharSequence charSequence)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public Hasher putString(CharSequence charSequence, Charset charset)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public <T> Hasher putObject(T t, Funnel<? super T> funnel)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public HashCode hash()
-        {
-            byte[] res = new byte[bits / 8];
-            int i = 0;
-            for (Hasher hasher : underlying)
-            {
-                HashCode newHash = hasher.hash();
-                i += newHash.writeBytesTo(res, i, newHash.bits() / 8);
-            }
-            return HashCode.fromBytes(res);
-        }
-
-        public long getCount()
-        {
-            return count;
-        }
-    }
-
     private MerkleTree.RowHash rowHash(UnfilteredRowIterator partition)
     {
         validated++;
         // MerkleTree uses XOR internally, so we want lots of output bits here
-        CountingHasher hasher = new CountingHasher();
-        UnfilteredRowIterators.digest(partition, hasher, MessagingService.current_version);
+        Digest digest = Digest.forValidator();
+        UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version);
         // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979
-        return hasher.count > 0
-             ? new MerkleTree.RowHash(partition.partitionKey().getToken(), hasher.hash().asBytes(), hasher.count)
+        return digest.inputBytes() > 0
+             ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.inputBytes())
              : null;
     }
 
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java b/src/java/org/apache/cassandra/schema/SchemaConstants.java
index e51a31b..82bd2cb 100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -26,7 +26,7 @@ import java.util.regex.Pattern;
 
 import com.google.common.collect.ImmutableSet;
 
-import org.apache.cassandra.utils.HashingUtils;
+import org.apache.cassandra.db.Digest;
 
 public final class SchemaConstants
 {
@@ -66,7 +66,7 @@ public final class SchemaConstants
 
     static
     {
-        emptyVersion = UUID.nameUUIDFromBytes(HashingUtils.CURRENT_HASH_FUNCTION.newHasher().hash().asBytes());
+        emptyVersion = UUID.nameUUIDFromBytes(Digest.forSchema().digest());
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index f31cf37..76bda0f 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -26,7 +26,6 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 import com.google.common.collect.Maps;
-import com.google.common.hash.Hasher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +47,6 @@ import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HashingUtils;
 
 import static java.lang.String.format;
 
@@ -354,8 +352,7 @@ public final class SchemaKeyspace
      */
     static UUID calculateSchemaDigest()
     {
-        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-
+        Digest digest = Digest.forSchema();
         for (String table : ALL)
         {
             // Due to CASSANDRA-11050 we want to exclude DROPPED_COLUMNS for schema digest computation. We can and
@@ -372,12 +369,12 @@ public final class SchemaKeyspace
                     try (RowIterator partition = schema.next())
                     {
                         if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
-                            RowIterators.digest(partition, hasher);
+                            RowIterators.digest(partition, digest);
                     }
                 }
             }
         }
-        return UUID.nameUUIDFromBytes(hasher.hash().asBytes());
+        return UUID.nameUUIDFromBytes(digest.digest());
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index f0d9132..1797087 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Field;
 import java.math.BigInteger;
 import java.net.*;
 import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.zip.CRC32;
@@ -101,6 +103,18 @@ public class FBUtilities
 
     public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
 
+    public static MessageDigest newMessageDigest(String algorithm)
+    {
+        try
+        {
+            return MessageDigest.getInstance(algorithm);
+        }
+        catch (NoSuchAlgorithmException nsae)
+        {
+            throw new RuntimeException("the requested digest algorithm (" + algorithm + ") is not available", nsae);
+        }
+    }
+
     /**
      * Please use getJustBroadcastAddress instead. You need this only when you have to listen/connect. It's also missing
      * the port you should be using. 99% of code doesn't want this.
diff --git a/src/java/org/apache/cassandra/utils/HashingUtils.java b/src/java/org/apache/cassandra/utils/HashingUtils.java
deleted file mode 100644
index 9e65a5d..0000000
--- a/src/java/org/apache/cassandra/utils/HashingUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.utils;
-
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-
-public class HashingUtils
-{
-    public static final HashFunction CURRENT_HASH_FUNCTION = Hashing.md5();
-
-    public static MessageDigest newMessageDigest(String algorithm)
-    {
-        try
-        {
-            return MessageDigest.getInstance(algorithm);
-        }
-        catch (NoSuchAlgorithmException nsae)
-        {
-            throw new RuntimeException("the requested digest algorithm (" + algorithm + ") is not available", nsae);
-        }
-    }
-
-    public static void updateBytes(Hasher hasher, ByteBuffer input)
-    {
-        if (!input.hasRemaining())
-            return;
-
-        if (input.hasArray())
-        {
-            byte[] b = input.array();
-            int ofs = input.arrayOffset();
-            int pos = input.position();
-            int lim = input.limit();
-            hasher.putBytes(b, ofs + pos, lim - pos);
-            input.position(lim);
-        }
-        else
-        {
-            int len = input.remaining();
-            int n = Math.min(len, 1 << 12); // either the remaining amount or 4kb
-            byte[] tempArray = new byte[n];
-            while (len > 0)
-            {
-                int chunk = Math.min(len, tempArray.length);
-                input.get(tempArray, 0, chunk);
-                hasher.putBytes(tempArray, 0, chunk);
-                len -= chunk;
-            }
-        }
-    }
-
-    public static void updateWithShort(Hasher hasher, int val)
-    {
-        hasher.putByte((byte) ((val >> 8) & 0xFF));
-        hasher.putByte((byte) (val & 0xFF));
-    }
-
-    public static void updateWithByte(Hasher hasher, int val)
-    {
-        hasher.putByte((byte) (val & 0xFF));
-    }
-
-    public static void updateWithInt(Hasher hasher, int val)
-    {
-        hasher.putByte((byte) ((val >>> 24) & 0xFF));
-        hasher.putByte((byte) ((val >>> 16) & 0xFF));
-        hasher.putByte((byte) ((val >>>  8) & 0xFF));
-        hasher.putByte((byte) ((val >>> 0) & 0xFF));
-    }
-
-    public static void updateWithLong(Hasher hasher, long val)
-    {
-        hasher.putByte((byte) ((val >>> 56) & 0xFF));
-        hasher.putByte((byte) ((val >>> 48) & 0xFF));
-        hasher.putByte((byte) ((val >>> 40) & 0xFF));
-        hasher.putByte((byte) ((val >>> 32) & 0xFF));
-        hasher.putByte((byte) ((val >>> 24) & 0xFF));
-        hasher.putByte((byte) ((val >>> 16) & 0xFF));
-        hasher.putByte((byte) ((val >>>  8) & 0xFF));
-        hasher.putByte((byte)  ((val >>> 0) & 0xFF));
-    }
-
-    public static void updateWithBoolean(Hasher hasher, boolean val)
-    {
-        updateWithByte(hasher, val ? 0 : 1);
-    }
-}
diff --git a/src/java/org/apache/cassandra/utils/MD5Digest.java b/src/java/org/apache/cassandra/utils/MD5Digest.java
index 5c0c1de..d542991 100644
--- a/src/java/org/apache/cassandra/utils/MD5Digest.java
+++ b/src/java/org/apache/cassandra/utils/MD5Digest.java
@@ -46,7 +46,7 @@ public class MD5Digest
         @Override
         protected MessageDigest initialValue()
         {
-            return HashingUtils.newMessageDigest("MD5");
+            return FBUtilities.newMessageDigest("MD5");
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index 103042d..c83e292 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -403,15 +403,35 @@ public class UUIDGen
         long pid = NativeLibrary.getProcessID();
         if (pid < 0)
             pid = new Random(System.currentTimeMillis()).nextLong();
-        HashingUtils.updateWithLong(hasher, pid);
+        updateWithLong(hasher, pid);
 
         ClassLoader loader = UUIDGen.class.getClassLoader();
         int loaderId = loader != null ? System.identityHashCode(loader) : 0;
-        HashingUtils.updateWithInt(hasher, loaderId);
+        updateWithInt(hasher, loaderId);
 
         return hasher.hash().asBytes();
     }
 
+    private static void updateWithInt(Hasher hasher, int val)
+    {
+        hasher.putByte((byte) ((val >>> 24) & 0xFF));
+        hasher.putByte((byte) ((val >>> 16) & 0xFF));
+        hasher.putByte((byte) ((val >>>  8) & 0xFF));
+        hasher.putByte((byte) ((val >>> 0) & 0xFF));
+    }
+
+    public static void updateWithLong(Hasher hasher, long val)
+    {
+        hasher.putByte((byte) ((val >>> 56) & 0xFF));
+        hasher.putByte((byte) ((val >>> 48) & 0xFF));
+        hasher.putByte((byte) ((val >>> 40) & 0xFF));
+        hasher.putByte((byte) ((val >>> 32) & 0xFF));
+        hasher.putByte((byte) ((val >>> 24) & 0xFF));
+        hasher.putByte((byte) ((val >>> 16) & 0xFF));
+        hasher.putByte((byte) ((val >>>  8) & 0xFF));
+        hasher.putByte((byte)  ((val >>> 0) & 0xFF));
+    }
+
     /**
      * Helper function used exclusively by UUIDGen to create
      **/
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 0852312..7ed8a60 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -23,14 +23,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.google.common.hash.Hasher;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.github.benmanes.caffeine.cache.Weigher;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -45,8 +44,8 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HashingUtils;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
@@ -105,11 +104,11 @@ public class CacheProviderTest
     private void assertDigests(IRowCacheEntry one, CachedBTreePartition two)
     {
         assertTrue(one instanceof CachedBTreePartition);
-        Hasher h1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        Hasher h2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), h1, MessagingService.current_version);
-        UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), h2, MessagingService.current_version);
-        Assert.assertEquals(h1.hash(), h2.hash());
+        Digest d1 = Digest.forReadResponse();
+        Digest d2 = Digest.forReadResponse();
+        UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version);
+        UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version);
+        assertArrayEquals(d1.digest(), d2.digest());
     }
 
     private void concurrentCase(final CachedBTreePartition partition, final ICache<MeasureableString, IRowCacheEntry> cache) throws InterruptedException
diff --git a/test/unit/org/apache/cassandra/db/CounterCellTest.java b/test/unit/org/apache/cassandra/db/CounterCellTest.java
index 36f8c92..4ce9802 100644
--- a/test/unit/org/apache/cassandra/db/CounterCellTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCellTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
 
-import com.google.common.hash.Hasher;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -263,8 +262,8 @@ public class CounterCellTest
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
         ByteBuffer col = ByteBufferUtil.bytes("val");
 
-        Hasher hasher1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        Hasher hasher2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
+        Digest digest1 = Digest.forReadResponse();
+        Digest digest2 = Digest.forReadResponse();
 
         CounterContext.ContextState state = CounterContext.ContextState.allocate(0, 2, 2);
         state.writeRemote(CounterId.fromInt(1), 4L, 4L);
@@ -277,10 +276,10 @@ public class CounterCellTest
         ColumnMetadata cDef = cfs.metadata().getColumn(col);
         Cell cleared = BufferCell.live(cDef, 5, CounterContext.instance().clearAllLocal(state.context));
 
-        original.digest(hasher1);
-        cleared.digest(hasher2);
+        original.digest(digest1);
+        cleared.digest(digest2);
 
-        Assert.assertEquals(hasher1.hash(), hasher2.hash());
+        assertArrayEquals(digest1.digest(), digest2.digest());
     }
 
     @Test
@@ -297,9 +296,9 @@ public class CounterCellTest
         builder.addCell(emptyCell);
         Row row = builder.build();
 
-        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        row.digest(hasher);
-        assertNotNull(hasher.hash());
+        Digest digest = Digest.forReadResponse();
+        row.digest(digest);
+        assertNotNull(digest.digest());
     }
 
 }
diff --git a/test/unit/org/apache/cassandra/db/DigestTest.java b/test/unit/org/apache/cassandra/db/DigestTest.java
new file mode 100644
index 0000000..4fd12d0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/DigestTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import com.google.common.hash.Hashing;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class DigestTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(DigestTest.class);
+
+    @Test
+    public void hashEmptyBytes() throws Exception {
+        Assert.assertArrayEquals(Hex.hexToBytes("d41d8cd98f00b204e9800998ecf8427e"),
+                                 Digest.forReadResponse().update(ByteBufferUtil.EMPTY_BYTE_BUFFER).digest());
+    }
+
+    @Test
+    public void hashBytesFromTinyDirectByteBuffer() throws Exception {
+        ByteBuffer directBuf = ByteBuffer.allocateDirect(8);
+        directBuf.putLong(5L).position(0);
+        directBuf.position(0);
+        assertArrayEquals(Hex.hexToBytes("aaa07454fa93ed2d37b4c5da9f2f87fd"),
+                                         Digest.forReadResponse().update(directBuf).digest());
+    }
+
+    @Test
+    public void hashBytesFromLargerDirectByteBuffer() throws Exception {
+        ByteBuffer directBuf = ByteBuffer.allocateDirect(1024);
+        for (int i = 0; i < 100; i++) {
+            directBuf.putInt(i);
+        }
+        directBuf.position(0);
+        assertArrayEquals(Hex.hexToBytes("daf10ea8894783b1b2618309494cde21"),
+                          Digest.forReadResponse().update(directBuf).digest());
+    }
+
+    @Test
+    public void hashBytesFromTinyOnHeapByteBuffer() throws Exception {
+        ByteBuffer onHeapBuf = ByteBuffer.allocate(8);
+        onHeapBuf.putLong(5L);
+        onHeapBuf.position(0);
+        assertArrayEquals(Hex.hexToBytes("aaa07454fa93ed2d37b4c5da9f2f87fd"),
+                          Digest.forReadResponse().update(onHeapBuf).digest());
+    }
+
+    @Test
+    public void hashBytesFromLargerOnHeapByteBuffer() throws Exception {
+        ByteBuffer onHeapBuf = ByteBuffer.allocate(1024);
+        for (int i = 0; i < 100; i++) {
+            onHeapBuf.putInt(i);
+        }
+        onHeapBuf.position(0);
+        assertArrayEquals(Hex.hexToBytes("daf10ea8894783b1b2618309494cde21"),
+                          Digest.forReadResponse().update(onHeapBuf).digest());
+    }
+
+    @Test
+    public void testValidatorDigest()
+    {
+        Digest[] digests = new Digest[]
+                           {
+                           Digest.forValidator(),
+                           new Digest(Hashing.murmur3_128(1000).newHasher()),
+                           new Digest(Hashing.murmur3_128(2000).newHasher())
+                           };
+        byte [] random = UUIDGen.getTimeUUIDBytes();
+
+        for (Digest digest : digests)
+        {
+            digest.updateWithByte((byte) 33)
+                  .update(random, 0, random.length)
+                  .update(ByteBuffer.wrap(random))
+                  .update(random, 0, 3)
+                  .updateWithBoolean(false)
+                  .updateWithInt(77)
+                  .updateWithLong(101);
+        }
+
+        long len = Byte.BYTES
+                   + random.length * 2 // both the byte[] and the ByteBuffer
+                   + 3 // 3 bytes from the random byte[]
+                   + Byte.BYTES
+                   + Integer.BYTES
+                   + Long.BYTES;
+
+        assertEquals(len, digests[0].inputBytes());
+        byte[] h = digests[0].digest();
+        assertArrayEquals(digests[1].digest(), Arrays.copyOfRange(h, 0, 16));
+        assertArrayEquals(digests[2].digest(), Arrays.copyOfRange(h, 16, 32));
+    }
+
+}
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 38fb38a..be3a9e4 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
 
-import com.google.common.hash.Hasher;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
@@ -40,9 +42,9 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HashingUtils;
 
 import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
@@ -138,28 +140,22 @@ public class PartitionTest
             ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(cmd1);
             ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(cmd2);
 
-            Hasher hasher1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-            Hasher hasher2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), hasher1, version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), hasher2, version);
-            Assert.assertFalse(hasher1.hash().equals(hasher2.hash()));
+            byte[] digest1 = getDigest(p1.unfilteredIterator(), version);
+            byte[] digest2 = getDigest(p2.unfilteredIterator(), version);
+            assertFalse(Arrays.equals(digest1, digest2));
 
             p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
             p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
-            hasher1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-            hasher2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), hasher1, version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), hasher2, version);
-            Assert.assertEquals(hasher1.hash(), hasher2.hash());
+            digest1 = getDigest(p1.unfilteredIterator(), version);
+            digest2 = getDigest(p2.unfilteredIterator(), version);
+            assertArrayEquals(digest1, digest2);
 
             p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
             RowUpdateBuilder.deleteRow(cfs.metadata(), 6, "key2", "c").applyUnsafe();
             p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
-            hasher1 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-            hasher2 = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), hasher1, version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), hasher2, version);
-            Assert.assertFalse(hasher1.hash().equals(hasher2.hash()));
+            digest1 = getDigest(p1.unfilteredIterator(), version);
+            digest2 = getDigest(p2.unfilteredIterator(), version);
+            assertFalse(Arrays.equals(digest1, digest2));
         }
         finally
         {
@@ -167,6 +163,13 @@ public class PartitionTest
         }
     }
 
+    private byte[] getDigest(UnfilteredRowIterator partition, int version)
+    {
+        Digest digest = Digest.forReadResponse();
+        UnfilteredRowIterators.digest(partition, digest, version);
+        return digest.digest();
+    }
+
     @Test
     public void testColumnStatsRecordsRowDeletesCorrectly()
     {
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 8b73502..c04f489 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
@@ -72,6 +73,7 @@ import org.apache.cassandra.utils.UUIDGen;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ReadCommandTest
 {
@@ -82,6 +84,7 @@ public class ReadCommandTest
     private static final String CF4 = "Standard4";
     private static final String CF5 = "Standard5";
     private static final String CF6 = "Standard6";
+    private static final String CF7 = "Counter7";
 
     private static final InetAddressAndPort REPAIR_COORDINATOR;
     static {
@@ -151,6 +154,13 @@ public class ReadCommandTest
                      .addRegularColumn("b", AsciiType.instance)
                      .caching(CachingParams.CACHE_EVERYTHING);
 
+        TableMetadata.Builder metadata7 =
+        TableMetadata.builder(KEYSPACE, CF7)
+                     .flags(EnumSet.of(TableMetadata.Flag.COUNTER))
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("c", CounterColumnType.instance);
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
@@ -159,7 +169,8 @@ public class ReadCommandTest
                                     metadata3,
                                     metadata4,
                                     metadata5,
-                                    metadata6);
+                                    metadata6,
+                                    metadata7);
 
         LocalSessionAccessor.startup();
     }
@@ -647,6 +658,66 @@ public class ReadCommandTest
         assertEquals(1, readCount(sstables.get(1)));
     }
 
+    @Test
+    public void dontIncludeLegacyCounterContextInDigest() throws IOException
+    {
+        // Serializations of a CounterContext containing legacy (pre-2.1) shards
+        // can legitimately differ across replicas. For this reason, the context
+        // bytes are omitted from the repaired digest if they contain legacy shards.
+        // This clearly has a tradeoff with the efficacy of the digest, without doing
+        // so false positive digest mismatches will be reported for scenarios where
+        // there is nothing that can be done to "fix" the replicas
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF7);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        // insert a row with the counter column having value 0, in a legacy shard.
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+                .clustering("aa")
+                .addLegacyCounterCell("c", 0L)
+                .build()
+                .apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        // execute a read and capture the digest
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+        ByteBuffer digestWithLegacyCounter0 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
+        assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0));
+
+        // truncate, then re-insert the same partition, but this time with a legacy
+        // shard having the value 1. The repaired digest should match the previous, as
+        // the values (context) are not included, only the cell metadata (ttl, timestamp, etc)
+        cfs.truncateBlocking();
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+                .clustering("aa")
+                .addLegacyCounterCell("c", 1L)
+                .build()
+                .apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        ByteBuffer digestWithLegacyCounter1 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
+        assertEquals(digestWithLegacyCounter0, digestWithLegacyCounter1);
+
+        // truncate, then re-insert the same partition, but this time with a non-legacy
+        // counter cell present. The repaired digest should not match the previous ones
+        // as this time the value (context) is included.
+        cfs.truncateBlocking();
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+                .clustering("aa")
+                .add("c", 1L)
+                .build()
+                .apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        ByteBuffer digestWithCounterCell = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
+        assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithCounterCell));
+        assertFalse(digestWithLegacyCounter0.equals(digestWithCounterCell));
+        assertFalse(digestWithLegacyCounter1.equals(digestWithCounterCell));
+    }
+
     private long readCount(SSTableReader sstable)
     {
         return sstable.getReadMeter().count();
@@ -810,10 +881,19 @@ public class ReadCommandTest
         }
     }
 
-    private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession) throws IOException
+    private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession)
     {
-        sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false);
-        sstable.reloadSSTableMetadata();
+        try
+        {
+            sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false);
+            sstable.reloadSSTableMetadata();
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+            fail("Caught IOException when mutating sstable metadata");
+        }
+
         if (pendingSession != null)
         {
             // setup a minimal repair session. This is necessary because we
diff --git a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
index 1ac5440..3a07a00 100644
--- a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.rows.*;
@@ -181,4 +183,12 @@ public class RowUpdateBuilder
     {
         return delete(columnMetadata.name.toString());
     }
+
+    public RowUpdateBuilder addLegacyCounterCell(String columnName, long value)
+    {
+        assert updateBuilder.metadata().getColumn(new ColumnIdentifier(columnName, true)).isCounterColumn();
+        ByteBuffer val = CounterContext.instance().createLocal(value);
+        rowBuilder().add(columnName, val);
+        return this;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a288edb..cf3411a 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -28,8 +27,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.hash.Hasher;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.CompactionsTest;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -61,6 +58,7 @@ import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -197,8 +195,8 @@ public class ValidatorTest
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
         UUID repairSessionId = UUIDGen.getTimeUUID();
         final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
-                                               cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
-                                                                                                                sstable.last.getToken())));
+                                                     cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
+                                                                                                               sstable.last.getToken())));
 
         InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.2");
 
@@ -371,45 +369,6 @@ public class ValidatorTest
         return left;
     }
 
-    @Test
-    public void testCountingHasher()
-    {
-        Hasher [] hashers = new Hasher[] {new Validator.CountingHasher(), Validator.CountingHasher.hashFunctions[0].newHasher(), Validator.CountingHasher.hashFunctions[1].newHasher() };
-        byte [] random = UUIDGen.getTimeUUIDBytes();
-
-        // call all overloaded methods:
-        for (Hasher hasher : hashers)
-        {
-            hasher.putByte((byte) 33)
-                  .putBytes(random)
-                  .putBytes(ByteBuffer.wrap(random))
-                  .putBytes(random, 0, 3)
-                  .putChar('a')
-                  .putBoolean(false)
-                  .putDouble(3.3)
-                  .putInt(77)
-                  .putFloat(99)
-                  .putLong(101)
-                  .putShort((short) 23);
-        }
-
-        long len = Byte.BYTES
-                   + random.length * 2 // both the byte[] and the ByteBuffer
-                   + 3 // 3 bytes from the random byte[]
-                   + Character.BYTES
-                   + Byte.BYTES
-                   + Double.BYTES
-                   + Integer.BYTES
-                   + Float.BYTES
-                   + Long.BYTES
-                   + Short.BYTES;
-
-        byte [] h = hashers[0].hash().asBytes();
-        assertTrue(Arrays.equals(hashers[1].hash().asBytes(), Arrays.copyOfRange(h, 0, 16)));
-        assertTrue(Arrays.equals(hashers[2].hash().asBytes(), Arrays.copyOfRange(h, 16, 32)));
-        assertEquals(len, ((Validator.CountingHasher)hashers[0]).getCount());
-    }
-
     private CompletableFuture<Message> registerOutgoingMessageSink()
     {
         final CompletableFuture<Message> future = new CompletableFuture<>();
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
index 8881018..8ec0177 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
@@ -25,13 +25,13 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.TreeResponse;
-import org.apache.cassandra.utils.HashingUtils;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.MerkleTreesTest;
@@ -43,7 +43,9 @@ public class DifferenceHolderTest
 {
     private static byte[] digest(String string)
     {
-        return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes());
+        return Digest.forValidator()
+                     .update(string.getBytes(), 0, string.getBytes().length)
+                     .digest();
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/utils/HashingUtilsTest.java b/test/unit/org/apache/cassandra/utils/HashingUtilsTest.java
deleted file mode 100644
index 3988903..0000000
--- a/test/unit/org/apache/cassandra/utils/HashingUtilsTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.utils;
-
-import java.nio.ByteBuffer;
-
-import com.google.common.hash.Hasher;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HashingUtilsTest
-{
-    private static final Logger logger = LoggerFactory.getLogger(HashingUtilsTest.class);
-
-    @Test
-    public void hashEmptyBytes() throws Exception {
-        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        HashingUtils.updateBytes(hasher, ByteBuffer.wrap(new byte[]{}));
-        String md5HashInHexOfEmptyByteArray = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes()));
-        Assert.assertEquals("d41d8cd98f00b204e9800998ecf8427e", md5HashInHexOfEmptyByteArray);
-    }
-
-    @Test
-    public void hashBytesFromTinyDirectByteBuffer() throws Exception {
-        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        ByteBuffer directBuf = ByteBuffer.allocateDirect(8);
-        directBuf.putLong(5L);
-        directBuf.position(0);
-        HashingUtils.updateBytes(hasher, directBuf);
-
-        String md5HashInHexOfDirectByteBuffer = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes()));
-        Assert.assertEquals("aaa07454fa93ed2d37b4c5da9f2f87fd", md5HashInHexOfDirectByteBuffer);
-    }
-
-    @Test
-    public void hashBytesFromLargerDirectByteBuffer() throws Exception {
-        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        ByteBuffer directBuf = ByteBuffer.allocateDirect(1024);
-        for (int i = 0; i < 100; i++) {
-            directBuf.putInt(i);
-        }
-        directBuf.position(0);
-        HashingUtils.updateBytes(hasher, directBuf);
-
-        String md5HashInHexOfDirectByteBuffer = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes()));
-        Assert.assertEquals("daf10ea8894783b1b2618309494cde21", md5HashInHexOfDirectByteBuffer);
-    }
-
-    @Test
-    public void hashBytesFromTinyOnHeapByteBuffer() throws Exception {
-        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        ByteBuffer onHeapBuf = ByteBuffer.allocate(8);
-        onHeapBuf.putLong(5L);
-        onHeapBuf.position(0);
-        HashingUtils.updateBytes(hasher, onHeapBuf);
-
-        String md5HashInHexOfDirectByteBuffer = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes()));
-        Assert.assertEquals("aaa07454fa93ed2d37b4c5da9f2f87fd", md5HashInHexOfDirectByteBuffer);
-    }
-
-    @Test
-    public void hashBytesFromLargerOnHeapByteBuffer() throws Exception {
-        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
-        ByteBuffer onHeapBuf = ByteBuffer.allocate(1024);
-        for (int i = 0; i < 100; i++) {
-            onHeapBuf.putInt(i);
-        }
-        onHeapBuf.position(0);
-        HashingUtils.updateBytes(hasher, onHeapBuf);
-
-        String md5HashInHexOfDirectByteBuffer = ByteBufferUtil.bytesToHex(ByteBuffer.wrap(hasher.hash().asBytes()));
-        Assert.assertEquals("daf10ea8894783b1b2618309494cde21", md5HashInHexOfDirectByteBuffer);
-    }
-}
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index 36ae4a0..1cdcc22 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -48,9 +49,11 @@ public class MerkleTreeTest
 {
     private static final byte[] DUMMY = digest("dummy");
 
-    private static byte[] digest(String string)
+    static byte[] digest(String string)
     {
-        return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes());
+        return Digest.forValidator()
+                     .update(string.getBytes(), 0, string.getBytes().length)
+                     .digest();
     }
 
     /**
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
index 9e70c20..5b589fb 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreesTest.java
@@ -38,17 +38,13 @@ import org.apache.cassandra.utils.MerkleTree.RowHash;
 import org.apache.cassandra.utils.MerkleTree.TreeRange;
 import org.apache.cassandra.utils.MerkleTrees.TreeRangeIterator;
 
+import static org.apache.cassandra.utils.MerkleTreeTest.digest;
 import static org.junit.Assert.*;
 
 public class MerkleTreesTest
 {
     private static final byte[] DUMMY = digest("dummy");
 
-    private static byte[] digest(String string)
-    {
-        return HashingUtils.newMessageDigest("SHA-256").digest(string.getBytes());
-    }
-
     /**
      * If a test assumes that the tree is 8 units wide, then it should set this value
      * to 8.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Exclude purgeable tombstones from repaired data digest

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9a280516ca8b9e730ae0648e5e29ee6280605132
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Wed Dec 18 18:31:36 2019 +0000

    Exclude purgeable tombstones from repaired data digest
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-15462
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/PartitionRangeReadCommand.java    |   2 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  | 160 ++++++++++--
 .../cassandra/db/SinglePartitionReadCommand.java   |   2 +-
 .../cassandra/db/partitions/PurgeFunction.java     |   7 +-
 .../distributed/test/RepairDigestTrackingTest.java |  54 ++++
 .../org/apache/cassandra/db/ReadCommandTest.java   | 272 ++++++++++++++++++++-
 7 files changed, 473 insertions(+), 25 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3d5217a..43126ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Exclude purgeable tombstones from repaired data tracking (CASSANDRA-15462)
  * Exclude legacy counter shards from repaired data tracking (CASSANDRA-15461)
  * Make it easier to add trace headers to messages (CASSANDRA-15499)
  * Fix and optimise partial compressed sstable streaming (CASSANDRA-13938)
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 2145389..cb68950 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -282,7 +282,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
             if (inputCollector.isEmpty())
                 return EmptyIterators.unfilteredPartition(metadata());
 
-            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators()), cfs);
+            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone)), cfs);
         }
         catch (RuntimeException | Error e)
         {
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 9485abc..4f8ea3e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -90,7 +90,7 @@ public abstract class ReadCommand extends AbstractReadQuery
 
     // for data queries, coordinators may request information on the repaired data used in constructing the response
     private boolean trackRepairedStatus = false;
-    // tracker for repaired data, initialized to singelton null object
+    // tracker for repaired data, initialized to singleton null object
     private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
     {
         void trackPartitionKey(DecoratedKey key){}
@@ -724,7 +724,7 @@ public abstract class ReadCommand extends AbstractReadQuery
     }
 
     private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
-                                                               final RepairedDataInfo repairedDataInfo)
+                                                                    final RepairedDataInfo repairedDataInfo)
     {
         class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
         {
@@ -744,6 +744,7 @@ public abstract class ReadCommand extends AbstractReadQuery
         {
             protected DecoratedKey applyToPartitionKey(DecoratedKey key)
             {
+                repairedDataInfo.onNewPartition(iterator);
                 repairedDataInfo.trackPartitionKey(key);
                 return key;
             }
@@ -762,7 +763,7 @@ public abstract class ReadCommand extends AbstractReadQuery
 
             protected Row applyToStatic(Row row)
             {
-                repairedDataInfo.trackRow(row);
+                repairedDataInfo.trackStaticRow(row);
                 return row;
             }
 
@@ -771,21 +772,48 @@ public abstract class ReadCommand extends AbstractReadQuery
                 repairedDataInfo.trackRow(row);
                 return row;
             }
-        }
 
+            protected void onPartitionClose()
+            {
+                repairedDataInfo.onPartitionClose();
+            }
+        }
         return Transformation.apply(iterator, new WithTracking());
     }
 
     private static class RepairedDataInfo
     {
-        private Digest hasher;
+        // Keeps a digest of the partition currently being processed. Since we won't know
+        // whether a partition will be fully purged from a read result until it's been
+        // consumed, we buffer this per-partition digest and add it to the final digest
+        // when the partition is closed (if it wasn't fully purged).
+        private Digest perPartitionDigest;
+        private Digest perCommandDigest;
         private boolean isConclusive = true;
 
+        // Doesn't actually purge from the underlying iterators, but excludes from the digest
+        // the purger can't be initialized until we've iterated all the sstables for the query
+        // as it requires the oldest repaired tombstone
+        private RepairedDataPurger purger;
+        private boolean isFullyPurged = true;
+
         ByteBuffer getDigest()
         {
-            return hasher == null
+            return perCommandDigest == null
                    ? ByteBufferUtil.EMPTY_BYTE_BUFFER
-                   : ByteBuffer.wrap(getHasher().digest());
+                   : ByteBuffer.wrap(perCommandDigest.digest());
+        }
+
+        protected void onNewPartition(UnfilteredRowIterator partition)
+        {
+            assert purger != null;
+            purger.setCurrentKey(partition.partitionKey());
+            purger.setIsReverseOrder(partition.isReverseOrder());
+        }
+
+        protected void setPurger(RepairedDataPurger purger)
+        {
+            this.purger = purger;
         }
 
         boolean isConclusive()
@@ -800,30 +828,128 @@ public abstract class ReadCommand extends AbstractReadQuery
 
         void trackPartitionKey(DecoratedKey key)
         {
-            getHasher().update(key.getKey());
+            getPerPartitionDigest().update(key.getKey());
         }
 
         void trackDeletion(DeletionTime deletion)
         {
-            deletion.digest(getHasher());
+            assert purger != null;
+            DeletionTime purged = purger.applyToDeletion(deletion);
+            if (!purged.isLive())
+                isFullyPurged = false;
+
+            purged.digest(getPerPartitionDigest());
         }
 
         void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
         {
-            marker.digest(getHasher());
+            assert purger != null;
+            RangeTombstoneMarker purged = purger.applyToMarker(marker);
+            if (purged != null)
+            {
+                isFullyPurged = false;
+                purged.digest(getPerPartitionDigest());
+            }
+        }
+
+        void trackStaticRow(Row row)
+        {
+            assert purger != null;
+            Row purged = purger.applyToRow(row);
+            if (!purged.isEmpty())
+            {
+                isFullyPurged = false;
+                purged.digest(getPerPartitionDigest());
+            }
         }
 
         void trackRow(Row row)
         {
-            row.digest(getHasher());
+            assert purger != null;
+            Row purged = purger.applyToRow(row);
+            if (purged != null)
+            {
+                isFullyPurged = false;
+                purged.digest(getPerPartitionDigest());
+            }
+        }
+
+        private Digest getPerPartitionDigest()
+        {
+            if (perPartitionDigest == null)
+                perPartitionDigest = Digest.forRepairedDataTracking();
+
+            return perPartitionDigest;
+        }
+
+        private void onPartitionClose()
+        {
+            if (perPartitionDigest != null)
+            {
+                // If the partition wasn't completely emptied by the purger,
+                // calculate the digest for the partition and use it to
+                // update the overall digest
+                if (!isFullyPurged)
+                {
+                    if (perCommandDigest == null)
+                        perCommandDigest = Digest.forRepairedDataTracking();
+
+                    byte[] partitionDigest = perPartitionDigest.digest();
+                    perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
+                    isFullyPurged = true;
+                }
+
+                perPartitionDigest = null;
+            }
+        }
+    }
+
+    /**
+     * Although PurgeFunction extends Transformation, this is never applied to an iterator.
+     * Instead, it is used by RepairedDataInfo during the generation of a repaired data
+     * digest to exclude data which will actually be purged later on in the read pipeline.
+     */
+    private static class RepairedDataPurger extends PurgeFunction
+    {
+        RepairedDataPurger(ColumnFamilyStore cfs,
+                           int nowInSec,
+                           int oldestUnrepairedTombstone)
+        {
+            super(nowInSec,
+                  cfs.gcBefore(nowInSec),
+                  oldestUnrepairedTombstone,
+                  cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
+                  cfs.metadata.get().enforceStrictLiveness());
         }
 
-        private Digest getHasher()
+        protected LongPredicate getPurgeEvaluator()
         {
-            if (hasher == null)
-                hasher = Digest.forRepairedDataTracking();
+            return (time) -> true;
+        }
+
+        void setCurrentKey(DecoratedKey key)
+        {
+            super.onNewPartition(key);
+        }
+
+        void setIsReverseOrder(boolean isReverseOrder)
+        {
+            super.setReverseOrder(isReverseOrder);
+        }
 
-            return hasher;
+        public DeletionTime applyToDeletion(DeletionTime deletionTime)
+        {
+            return super.applyToDeletion(deletionTime);
+        }
+
+        public Row applyToRow(Row row)
+        {
+            return super.applyToRow(row);
+        }
+
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+        {
+            return super.applyToMarker(marker);
         }
     }
 
@@ -912,12 +1038,14 @@ public abstract class ReadCommand extends AbstractReadQuery
                 unrepairedIters.add(iter);
         }
 
-        List<T> finalizeIterators()
+        List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
         {
             if (repairedIters.isEmpty())
                 return unrepairedIters;
 
             // merge the repaired data before returning, wrapping in a digest generator
+            RepairedDataPurger purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
+            repairedDataInfo.setPurger(purger);
             unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
             return unrepairedIters;
         }
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index eb57b93..4daad7d 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -695,7 +695,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
 
             StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
 
-            return withSSTablesIterated(inputCollector.finalizeIterators(), cfs.metric, metricsCollector);
+            return withSSTablesIterated(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone), cfs.metric, metricsCollector);
         }
         catch (RuntimeException | Error e)
         {
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index 8dcd359..d9e9036 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@ -60,13 +60,18 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
     {
     }
 
+    protected void setReverseOrder(boolean isReverseOrder)
+    {
+        this.isReverseOrder = isReverseOrder;
+    }
+
     @Override
     @SuppressWarnings("resource")
     protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
     {
         onNewPartition(partition.partitionKey());
 
-        isReverseOrder = partition.isReverseOrder();
+        setReverseOrder(partition.isReverseOrder());
         UnfilteredRowIterator purged = Transformation.apply(partition, this);
         if (purged.isEmpty())
         {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index a987ea3..1af329f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -113,6 +114,59 @@ public class RepairDigestTrackingTest extends DistributedTestBase implements Ser
         }
     }
 
+    @Test
+    public void testPurgeableTombstonesAreIgnored() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+
+            cluster.get(1).runOnInstance(() -> {
+                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+            });
+
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
+            // on node1 only insert some tombstones, then flush
+            for (int i = 0; i < 10; i++)
+            {
+                cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
+            }
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+
+            // insert data on both nodes and flush
+            for (int i = 0; i < 10; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL,
+                                               i, i, i);
+            }
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+
+            // nothing is repaired yet
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
+            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
+            // mark everything repaired
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
+            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
+            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
+
+            // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
+            for (int i = 0; i < 10; i++)
+            {
+                cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
+            }
+
+            long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
+            // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
+            TimeUnit.SECONDS.sleep(2);
+            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL);
+            long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
+
+            Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
+        }
+    }
+
     private void assertNotRepaired(SSTableReader reader) {
         Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE);
     }
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index c04f489..4419c70 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -24,6 +24,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.Assert;
@@ -40,10 +41,12 @@ import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.Range;
@@ -61,8 +64,11 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
 import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
@@ -70,6 +76,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
+import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -85,6 +92,7 @@ public class ReadCommandTest
     private static final String CF5 = "Standard5";
     private static final String CF6 = "Standard6";
     private static final String CF7 = "Counter7";
+    private static final String CF8 = "Standard8";
 
     private static final InetAddressAndPort REPAIR_COORDINATOR;
     static {
@@ -161,6 +169,14 @@ public class ReadCommandTest
                      .addClusteringColumn("col", AsciiType.instance)
                      .addRegularColumn("c", CounterColumnType.instance);
 
+        TableMetadata.Builder metadata8 =
+        TableMetadata.builder(KEYSPACE, CF8)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .addRegularColumn("c", SetType.getInstance(AsciiType.instance, true));
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
@@ -170,7 +186,8 @@ public class ReadCommandTest
                                     metadata4,
                                     metadata5,
                                     metadata6,
-                                    metadata7);
+                                    metadata7,
+                                    metadata8);
 
         LocalSessionAccessor.startup();
     }
@@ -683,7 +700,7 @@ public class ReadCommandTest
         // execute a read and capture the digest
         ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
         ByteBuffer digestWithLegacyCounter0 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
-        assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0));
+        assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0));
 
         // truncate, then re-insert the same partition, but this time with a legacy
         // shard having the value 1. The repaired digest should match the previous, as
@@ -713,11 +730,254 @@ public class ReadCommandTest
         cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
 
         ByteBuffer digestWithCounterCell = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
-        assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithCounterCell));
+        assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithCounterCell));
         assertFalse(digestWithLegacyCounter0.equals(digestWithCounterCell));
         assertFalse(digestWithLegacyCounter1.equals(digestWithCounterCell));
     }
 
+    /**
+     * Writes a single partition containing a single row and reads using a partition read. The single
+     * row includes 1 live simple column, 1 simple tombstone and 1 complex column with a complex
+     * deletion and a live cell. The repaired data digests generated by executing the same query
+     * before and after the tombstones become eligible for purging should not match each other.
+     * Also, neither digest should be empty as the partition is not made empty by the purging.
+     */
+    @Test
+    public void purgeGCableTombstonesBeforeCalculatingDigest() throws Exception
+    {
+        KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(KEYSPACE);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF8);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        setGCGrace(cfs, 600);
+
+        DecoratedKey[] keys = new DecoratedKey[] { Util.dk("key0"), Util.dk("key1"), Util.dk("key2"), Util.dk("key3") };
+        int nowInSec = FBUtilities.nowInSeconds();
+        TableMetadata cfm = cfs.metadata();
+
+        // A simple tombstone
+        new RowUpdateBuilder(cfs.metadata(), 0, keys[0]).clustering("cc").delete("a").build().apply();
+
+        // Collection with an associated complex deletion
+        PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0);
+        builder.row("cc").add("c", ImmutableSet.of("element1", "element2"));
+        builder.buildAsMutation().apply();
+
+        // RangeTombstone and a row (not covered by the RT). The row contains a regular tombstone which will not be
+        // purged. This is to prevent the partition from being fully purged and removed from the final results
+        new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, keys[2]).addRangeTombstone("aa", "bb").build().apply();
+        new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, keys[2]).clustering("cc").delete("a").build().apply();
+
+        // Partition with 2 rows, one fully deleted
+        new RowUpdateBuilder(cfs.metadata.get(), 0, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply();
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        Map<DecoratedKey, ByteBuffer> digestsWithTombstones = new HashMap<>();
+        //Tombstones are not yet purgable
+        for (DecoratedKey key : keys)
+        {
+            ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
+            cmd.trackRepairedStatus();
+            Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+            assertFalse(partition.isEmpty());
+            partition.unfilteredIterator().forEachRemaining(u -> {
+                // must be either a RT, or a row containing some kind of deletion
+                assertTrue(u.isRangeTombstoneMarker() || ((Row)u).hasDeletion(cmd.nowInSec()));
+            });
+            ByteBuffer digestWithTombstones = cmd.getRepairedDataDigest();
+            // None should generate an empty digest
+            assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithTombstones);
+            digestsWithTombstones.put(key, digestWithTombstones);
+        }
+
+        // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
+        setGCGrace(cfs, 0);
+
+        //Tombstones are now purgable, so won't be in the read results and produce different digests
+        for (DecoratedKey key : keys)
+        {
+            ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec + 1).build();
+            cmd.trackRepairedStatus();
+            Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+            assertFalse(partition.isEmpty());
+            partition.unfilteredIterator().forEachRemaining(u -> {
+                // After purging, only rows without any deletions should remain.
+                // The one exception is "key2:cc" which has a regular column tombstone which is not
+                // eligible for purging. This is to prevent the partition from being fully purged
+                // when its RT is removed.
+                assertTrue(u.isRow());
+                Row r = (Row)u;
+                assertTrue(!r.hasDeletion(cmd.nowInSec())
+                           || (key.equals(keys[2]) && r.clustering()
+                                                       .get(0)
+                                                       .equals(AsciiType.instance.fromString("cc"))));
+
+            });
+            ByteBuffer digestWithoutTombstones = cmd.getRepairedDataDigest();
+            // not an empty digest
+            assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithoutTombstones);
+            // should not match the pre-purge digest
+            assertDigestsDiffer(digestsWithTombstones.get(key), digestWithoutTombstones);
+        }
+    }
+
+    private void setGCGrace(ColumnFamilyStore cfs, int gcGrace)
+    {
+        TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build();
+        KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace);
+        Schema.instance.load(
+            keyspaceMetadata.withSwapped(
+                keyspaceMetadata.tables.withSwapped(
+                    cfs.metadata().withSwapped(newParams))));
+    }
+
+    private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1)
+    {
+        assertTrue(ByteBufferUtil.compareUnsigned(b0, b1) != 0);
+    }
+
+    @Test
+    public void partitionReadFullyPurged() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        ReadCommand partitionRead = Util.cmd(cfs, Util.dk("key")).build();
+        fullyPurgedPartitionCreatesEmptyDigest(cfs, partitionRead);
+    }
+
+    @Test
+    public void rangeReadFullyPurged() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        ReadCommand rangeRead = Util.cmd(cfs).build();
+        fullyPurgedPartitionCreatesEmptyDigest(cfs, rangeRead);
+    }
+
+    /**
+     * Writes a single partition containing only a single row deletion and reads with either a range or
+     * partition query. Before the row deletion is eligible for purging, it should appear in the query
+     * results and cause a non-empty repaired data digest to be generated. Repeating the query after
+     * the row deletion is eligible for purging, both the result set and the repaired data digest should
+     * be empty.
+     */
+    private void fullyPurgedPartitionCreatesEmptyDigest(ColumnFamilyStore cfs, ReadCommand command) throws Exception
+    {
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        setGCGrace(cfs, 600);
+
+        // Partition with a single, fully deleted row
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key"), "cc").apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        command.trackRepairedStatus();
+        List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
+        assertEquals(1, partitions.size());
+        ByteBuffer digestWithTombstones = command.getRepairedDataDigest();
+        assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithTombstones) != 0);
+
+        // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
+        setGCGrace(cfs, 0);
+
+        AbstractReadCommandBuilder builder = command instanceof PartitionRangeReadCommand
+                                             ? Util.cmd(cfs)
+                                             : Util.cmd(cfs, Util.dk("key"));
+        builder.withNowInSeconds(command.nowInSec() + 60);
+        command = builder.build();
+        command.trackRepairedStatus();
+
+        partitions = Util.getAllUnfiltered(command);
+        assertTrue(partitions.isEmpty());
+        ByteBuffer digestWithoutTombstones = command.getRepairedDataDigest();
+        assertEquals(0, ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutTombstones));
+    }
+
+    /**
+     * Verifies that during range reads which include multiple partitions, fully purged partitions
+     * have no material effect on the calculated digest. This test writes two sstables, each containing
+     * a single partition; the first is live and the second fully deleted and eligible for purging.
+     * Initially, only the sstable containing the live partition is marked repaired, while a range read
+     * which covers both partitions is performed to generate a digest. Then the sstable containing the
+     * purged partition is also marked repaired and the query reexecuted. The digests produced by both
+     * queries should match as the digest calculation should exclude the fully purged partition.
+     */
+    @Test
+    public void mixedPurgedAndNonPurgedPartitions()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        setGCGrace(cfs, 0);
+
+        ReadCommand command = Util.cmd(cfs).withNowInSeconds(FBUtilities.nowInSeconds() + 60).build();
+
+        // Live partition in a repaired sstable, so included in the digest calculation
+        new RowUpdateBuilder(cfs.metadata.get(), 0, ByteBufferUtil.bytes("key-0")).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+        // Fully deleted partition in an unrepaired sstable, so not included in the intial digest
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1"), "cc").apply();
+        cfs.forceBlockingFlush();
+
+        command.trackRepairedStatus();
+        List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
+        assertEquals(1, partitions.size());
+        ByteBuffer digestWithoutPurgedPartition = command.getRepairedDataDigest();
+        assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutPurgedPartition) != 0);
+
+        // mark the sstable containing the purged partition as repaired, so both partitions are now
+        // read during in the digest calculation. Because the purged partition is entirely
+        // discarded, the resultant digest should match the earlier one.
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+        command = Util.cmd(cfs).withNowInSeconds(command.nowInSec()).build();
+        command.trackRepairedStatus();
+
+        partitions = Util.getAllUnfiltered(command);
+        assertEquals(1, partitions.size());
+        ByteBuffer digestWithPurgedPartition = command.getRepairedDataDigest();
+        assertEquals(0, ByteBufferUtil.compareUnsigned(digestWithPurgedPartition, digestWithoutPurgedPartition));
+    }
+
+    @Test
+    public void purgingConsidersRepairedDataOnly() throws Exception
+    {
+        // 2 sstables, first is repaired and contains data that is all purgeable
+        // the second is unrepaired and contains non-purgable data. Even though
+        // the partition itself is not fully purged, the repaired data digest
+        // should be empty as there was no non-purgeable, repaired data read.
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        setGCGrace(cfs, 0);
+
+        // Partition with a single, fully deleted row which will be fully purged
+        DecoratedKey key = Util.dk("key");
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, key, "cc").apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        new RowUpdateBuilder(cfs.metadata(), 1, key).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
+        cfs.forceBlockingFlush();
+
+        int nowInSec = FBUtilities.nowInSeconds() + 10;
+        ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
+        cmd.trackRepairedStatus();
+        Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+        assertFalse(partition.isEmpty());
+        // check that
+        try (UnfilteredRowIterator rows = partition.unfilteredIterator())
+        {
+            assertFalse(rows.isEmpty());
+            Unfiltered unfiltered = rows.next();
+            assertFalse(rows.hasNext());
+            assertTrue(unfiltered.isRow());
+            assertFalse(((Row) unfiltered).hasDeletion(nowInSec));
+        }
+        assertEquals(EMPTY_BYTE_BUFFER, cmd.getRepairedDataDigest());
+    }
+
     private long readCount(SSTableReader sstable)
     {
         return sstable.getReadMeter().count();
@@ -833,7 +1093,7 @@ public class ReadCommandTest
         Set<ByteBuffer> digests = new HashSet<>();
         // first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive
         ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true);
-        assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+        assertEquals(EMPTY_BYTE_BUFFER, digest);
         digests.add(digest);
 
         // add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive
@@ -872,12 +1132,12 @@ public class ReadCommandTest
                                         .delete()
                                         .build()).apply();
             digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
-            assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+            assertEquals(EMPTY_BYTE_BUFFER, digest);
 
             // now flush so we have an unrepaired table with the deletion and repeat the check
             cfs.forceBlockingFlush();
             digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
-            assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+            assertEquals(EMPTY_BYTE_BUFFER, digest);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org