You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/04/09 13:21:00 UTC

[cassandra] branch trunk updated: Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times.

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 00fb6d7  Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times.
00fb6d7 is described below

commit 00fb6d76d0a97af06ba27c1180d6dcddfa337fea
Author: Francisco Fernandez Castano <fr...@gmail.com>
AuthorDate: Wed Mar 25 12:15:20 2020 +0100

    Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times.
    
    patch by Francisco Fernandez; reviewed by Benjamin Lerer and Robert Stupp
    for CASSANDRA-14773
    
    This patch:
    * prevents int32-bit integer overflow
    * simplifies the underlying structures of StreamingTombstoneHistogramBuilder
    * avoid humongous allocations by maintaining separate arrays for tombstone timestamps and number of
    tombstone occurrences (these two were kept in the same array before)
    * introduces more test coverage.
---
 CHANGES.txt                                        |   1 +
 .../StreamingTombstoneHistogramBuilder.java        | 437 +++++++++------------
 .../utils/streamhist/TombstoneHistogram.java       |   7 +-
 .../StreamingTombstoneHistogramBuilderTest.java    | 232 ++++++++++-
 4 files changed, 423 insertions(+), 254 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 86a8813..46790c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773)
  * Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706)
  * Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390)
  * Repair history tables should have TTL and TWCS (CASSANDRA-12701)
diff --git a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
index 9856253..eda88bc 100755
--- a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
+++ b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
@@ -22,28 +22,32 @@ import java.math.RoundingMode;
 import java.util.Arrays;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.math.IntMath;
 
-import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.AddResult.ACCUMULATED;
-import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.AddResult.INSERTED;
+import org.apache.cassandra.db.rows.Cell;
 
 /**
  * Histogram that can be constructed from streaming of data.
+ *
+ * Histogram used to retrieve the number of droppable tombstones for example via
+ * {@link org.apache.cassandra.io.sstable.format.SSTableReader#getDroppableTombstonesBefore(int)}.
  * <p>
- * The original algorithm is taken from following paper:
- * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010)
- * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
+ * When an sstable is written (or streamed), this histogram-builder receives the "local deletion timestamp"
+ * as an {@code int} via {@link #update(int)}. Negative values are not supported.
  * <p>
  * Algorithm: Histogram is represented as collection of {point, weight} pairs. When new point <i>p</i> with weight <i>m</i> is added:
  * <ol>
- * <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li>
- * <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li>
- * <li>If point was added and collection size became lorger than maxBinSize:</li>
- * <ol type="a">
- * <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li>
- * <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
+ *     <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li>
+ *     <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li>
+ *     <li>If point was added and collection size became lorger than maxBinSize:</li>
  * </ol>
+ *
+ * <ol type="a">
+ *     <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li>
+ *     <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
  * </ol>
+ *
  * <p>
  * There are some optimization to make histogram builder faster:
  * <ol>
@@ -51,19 +55,19 @@ import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramB
  *     For example, if spoolSize=100, binSize=10 and there are only 50 different points. it will be only 40 merges regardless how many points will be added.</li>
  *     <li>Spool is organized as open-addressing primitive hash map where odd elements are points and event elements are values.
  *     Spool can not resize => when number of collisions became bigger than threashold or size became large that <i>array_size/2</i> Spool is drained to bin</li>
- *     <li>DistanceHolder - sorted collection of distances between points in Bin. It is used to find nearest points in constant time</li>
- *     <li>Distances and Bin organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li>
- *     <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin and <i></>{distance, left_point}</i> pairs is packed in one long</li>
+ *     <li>Bin is organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li>
+ *     <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin pairs is packed in one long</li>
  * </ol>
+ * <p>
+ * The original algorithm is taken from following paper:
+ * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010)
+ * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
  */
 public class StreamingTombstoneHistogramBuilder
 {
     // Buffer with point-value pair
     private final DataHolder bin;
 
-    // Buffer with distance between points, sorted from nearest to furthest
-    private final DistanceHolder distances;
-
     // Keep a second, larger buffer to spool data in, before finalizing it into `bin`
     private final Spool spool;
 
@@ -72,47 +76,42 @@ public class StreamingTombstoneHistogramBuilder
 
     public StreamingTombstoneHistogramBuilder(int maxBinSize, int maxSpoolSize, int roundSeconds)
     {
+        assert maxBinSize > 0 && maxSpoolSize >= 0 && roundSeconds > 0: "Invalid arguments: maxBinSize:" + maxBinSize + " maxSpoolSize:" + maxSpoolSize + " delta:" + roundSeconds;
+
         this.roundSeconds = roundSeconds;
         this.bin = new DataHolder(maxBinSize + 1, roundSeconds);
-        distances = new DistanceHolder(maxBinSize);
-
-        //for spool we need power-of-two cells
-        maxSpoolSize = maxSpoolSize == 0 ? 0 : IntMath.pow(2, IntMath.log2(maxSpoolSize, RoundingMode.CEILING));
-        spool = new Spool(maxSpoolSize);
+        this.spool = new Spool(maxSpoolSize);
     }
 
     /**
-     * Adds new point p to this histogram.
+     * Adds new point to this histogram with a default value of 1.
      *
-     * @param p
+     * @param point the point to be added
      */
-    public void update(int p)
+    public void update(int point)
     {
-        update(p, 1);
+        update(point, 1);
     }
 
     /**
-     * Adds new point p with value m to this histogram.
-     *
-     * @param p
-     * @param m
+     * Adds new point {@param point} with value {@param value} to this histogram.
      */
-    public void update(int p, int m)
+    public void update(int point, int value)
     {
-        p = roundKey(p, roundSeconds);
+        point = ceilKey(point, roundSeconds);
 
         if (spool.capacity > 0)
         {
-            if (!spool.tryAddOrAccumulate(p, m))
+            if (!spool.tryAddOrAccumulate(point, value))
             {
                 flushHistogram();
-                final boolean success = spool.tryAddOrAccumulate(p, m);
+                final boolean success = spool.tryAddOrAccumulate(point, value);
                 assert success : "Can not add value to spool"; // after spool flushing we should always be able to insert new value
             }
         }
         else
         {
-            flushValue(p, m);
+            flushValue(point, value);
         }
     }
 
@@ -127,50 +126,11 @@ public class StreamingTombstoneHistogramBuilder
 
     private void flushValue(int key, int spoolValue)
     {
-        DataHolder.NeighboursAndResult addResult = bin.addValue(key, spoolValue);
-        if (addResult.result == INSERTED)
-        {
-            final int prevPoint = addResult.prevPoint;
-            final int nextPoint = addResult.nextPoint;
-            if (prevPoint != -1 && nextPoint != -1)
-                distances.remove(prevPoint, nextPoint);
-            if (prevPoint != -1)
-                distances.add(prevPoint, key);
-            if (nextPoint != -1)
-                distances.add(key, nextPoint);
-        }
+        bin.addValue(key, spoolValue);
 
         if (bin.isFull())
         {
-            mergeBin();
-        }
-    }
-
-    private void mergeBin()
-    {
-        // find points point1, point2 which have smallest difference
-        final int[] smallestDifference = distances.getFirstAndRemove();
-
-        final int point1 = smallestDifference[0];
-        final int point2 = smallestDifference[1];
-
-        // merge those two
-        DataHolder.MergeResult mergeResult = bin.merge(point1, point2);
-
-        final int nextPoint = mergeResult.nextPoint;
-        final int prevPoint = mergeResult.prevPoint;
-        final int newPoint = mergeResult.newPoint;
-
-        if (nextPoint != -1)
-        {
-            distances.remove(point2, nextPoint);
-            distances.add(newPoint, nextPoint);
-        }
-
-        if (prevPoint != -1)
-        {
-            distances.remove(prevPoint, point1);
-            distances.add(prevPoint, newPoint);
+            bin.mergeNearestPoints();
         }
     }
 
@@ -185,72 +145,10 @@ public class StreamingTombstoneHistogramBuilder
         return new TombstoneHistogram(bin);
     }
 
-    private static class DistanceHolder
-    {
-        private static final long EMPTY = Long.MAX_VALUE;
-        private final long[] data;
-
-        DistanceHolder(int maxCapacity)
-        {
-            data = new long[maxCapacity];
-            Arrays.fill(data, EMPTY);
-        }
-
-        void add(int prev, int next)
-        {
-            long key = getKey(prev, next);
-            int index = Arrays.binarySearch(data, key);
-
-            assert (index < 0) : "Element already exists";
-            assert (data[data.length - 1] == EMPTY) : "No more space in array";
-
-            index = -index - 1;
-            System.arraycopy(data, index, data, index + 1, data.length - index - 1);
-            data[index] = key;
-        }
-
-        void remove(int prev, int next)
-        {
-            long key = getKey(prev, next);
-            int index = Arrays.binarySearch(data, key);
-            if (index >= 0)
-            {
-                if (index < data.length)
-                    System.arraycopy(data, index + 1, data, index, data.length - index - 1);
-                data[data.length - 1] = EMPTY;
-            }
-        }
-
-        int[] getFirstAndRemove()
-        {
-            if (data[0] == EMPTY)
-                return null;
-
-            int[] result = unwrapKey(data[0]);
-            System.arraycopy(data, 1, data, 0, data.length - 1);
-            data[data.length - 1] = EMPTY;
-            return result;
-        }
-
-        private int[] unwrapKey(long key)
-        {
-            final int distance = (int) (key >> 32);
-            final int prev = (int) (key & 0xFF_FF_FF_FFL);
-            return new int[]{ prev, prev + distance };
-        }
-
-        private long getKey(int prev, int next)
-        {
-            long distance = next - prev;
-            return (distance << 32) | prev;
-        }
-
-        public String toString()
-        {
-            return Arrays.stream(data).filter(x -> x != EMPTY).boxed().map(this::unwrapKey).map(Arrays::toString).collect(Collectors.joining());
-        }
-    }
-
+    /**
+     * An ordered collection of histogram buckets, each entry in the collection represents a pair (bucket, count).
+     * Once the collection is full it merges the closest buckets using a weighted approach see {@link #mergeNearestPoints()}.
+     */
     static class DataHolder
     {
         private static final long EMPTY = Long.MAX_VALUE;
@@ -270,11 +168,29 @@ public class StreamingTombstoneHistogramBuilder
             roundSeconds = holder.roundSeconds;
         }
 
-        NeighboursAndResult addValue(int point, int delta)
+        @VisibleForTesting
+        int getValue(int point)
+        {
+            long key = wrap(point, 0);
+            int index = Arrays.binarySearch(data, key);
+            if (index < 0)
+                index = -index - 1;
+            if (index >= data.length)
+                return -1; // not-found sentinel
+            if (unwrapPoint(data[index]) != point)
+                return -2; // not-found sentinel
+            return unwrapValue(data[index]);
+        }
+
+        /**
+         * Adds value {@code delta} to the point {@code point}.
+         *
+         * @return {@code true} if inserted, {@code false} if accumulated
+         */
+        boolean addValue(int point, int delta)
         {
             long key = wrap(point, 0);
             int index = Arrays.binarySearch(data, key);
-            AddResult addResult;
             if (index < 0)
             {
                 index = -index - 1;
@@ -287,25 +203,35 @@ public class StreamingTombstoneHistogramBuilder
                     System.arraycopy(data, index, data, index + 1, data.length - index - 1);
 
                     data[index] = wrap(point, delta);
-                    addResult = INSERTED;
+                    return true;
                 }
                 else
                 {
-                    data[index] += delta;
-                    addResult = ACCUMULATED;
+                    data[index] = wrap(point, (long) unwrapValue(data[index]) + delta);
                 }
             }
             else
             {
-                data[index] += delta;
-                addResult = ACCUMULATED;
+                data[index] = wrap(point, (long) unwrapValue(data[index]) + delta);
             }
 
-            return new NeighboursAndResult(getPrevPoint(index), getNextPoint(index), addResult);
+            return false;
         }
 
-        public MergeResult merge(int point1, int point2)
+        /**
+         *  Finds nearest points <i>p1</i> and <i>p2</i> in the collection
+         *  Replaces theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)
+         */
+        @VisibleForTesting
+        void mergeNearestPoints()
         {
+            assert isFull() : "DataHolder must be full in order to merge two points";
+
+            final int[] smallestDifference = findPointPairWithSmallestDistance();
+
+            final int point1 = smallestDifference[0];
+            final int point2 = smallestDifference[1];
+
             long key = wrap(point1, 0);
             int index = Arrays.binarySearch(data, key);
             if (index < 0)
@@ -315,47 +241,44 @@ public class StreamingTombstoneHistogramBuilder
                 assert (unwrapPoint(data[index]) == point1) : "Not found in array";
             }
 
-            final int prevPoint = getPrevPoint(index);
-            final int nextPoint = getNextPoint(index + 1);
-
-            int value1 = unwrapValue(data[index]);
-            int value2 = unwrapValue(data[index + 1]);
+            long value1 = unwrapValue(data[index]);
+            long value2 = unwrapValue(data[index + 1]);
 
             assert (unwrapPoint(data[index + 1]) == point2) : "point2 should follow point1";
 
-            int sum = value1 + value2;
+            long sum = value1 + value2;
 
             //let's evaluate in long values to handle overflow in multiplication
-            int newPoint = (int) (((long) point1 * value1 + (long) point2 * value2) / (value1 + value2));
-            newPoint = roundKey(newPoint, roundSeconds);
-            data[index] = wrap(newPoint, sum);
+            int newPoint = saturatingCastToInt((point1 * value1 + point2 * value2) / sum);
+            newPoint = ceilKey(newPoint, roundSeconds);
+            data[index] = wrap(newPoint, saturatingCastToInt(sum));
 
             System.arraycopy(data, index + 2, data, index + 1, data.length - index - 2);
             data[data.length - 1] = EMPTY;
-
-            return new MergeResult(prevPoint, newPoint, nextPoint);
         }
 
-        private int getPrevPoint(int index)
+        private int[] findPointPairWithSmallestDistance()
         {
-            if (index > 0)
-                if (data[index - 1] != EMPTY)
-                    return (int) (data[index - 1] >> 32);
-                else
-                    return -1;
-            else
-                return -1;
-        }
+            assert isFull(): "The DataHolder must be full in order to find the closest pair of points";
 
-        private int getNextPoint(int index)
-        {
-            if (index < data.length - 1)
-                if (data[index + 1] != EMPTY)
-                    return (int) (data[index + 1] >> 32);
-                else
-                    return -1;
-            else
-                return -1;
+            int point1 = 0;
+            int point2 = Integer.MAX_VALUE;
+
+            for (int i = 0; i < data.length - 1; i++)
+            {
+                int pointA = unwrapPoint(data[i]);
+                int pointB = unwrapPoint(data[i + 1]);
+
+                assert pointB > pointA : "DataHolder not sorted, p2(" + pointB +") < p1(" + pointA + ") for " + this;
+
+                if (point2 - point1 > pointB - pointA)
+                {
+                    point1 = pointA;
+                    point2 = pointB;
+                }
+            }
+
+            return new int[]{point1, point2};
         }
 
         private int[] unwrap(long key)
@@ -375,15 +298,15 @@ public class StreamingTombstoneHistogramBuilder
             return (int) (key & 0xFF_FF_FF_FFL);
         }
 
-        private long wrap(int point, int value)
+        private long wrap(int point, long value)
         {
-            return (((long) point) << 32) | value;
+            assert point >= 0 : "Invalid argument: point:" + point;
+            return (((long) point) << 32) | saturatingCastToInt(value);
         }
 
-
         public String toString()
         {
-            return Arrays.stream(data).filter(x -> x != EMPTY).boxed().map(this::unwrap).map(Arrays::toString).collect(Collectors.joining());
+            return Arrays.stream(data).filter(x -> x != EMPTY).mapToObj(this::unwrap).map(Arrays::toString).collect(Collectors.joining());
         }
 
         public boolean isFull()
@@ -434,6 +357,7 @@ public class StreamingTombstoneHistogramBuilder
                     {
                         final int prevPoint = unwrapPoint(data[i - 1]);
                         final int prevValue = unwrapValue(data[i - 1]);
+                        // calculate estimated count mb for point b
                         double weight = (b - prevPoint) / (double) (point - prevPoint);
                         double mb = prevValue + (value - prevValue) * weight;
                         sum -= prevValue;
@@ -450,34 +374,6 @@ public class StreamingTombstoneHistogramBuilder
             return sum;
         }
 
-        static class MergeResult
-        {
-            int prevPoint;
-            int newPoint;
-            int nextPoint;
-
-            MergeResult(int prevPoint, int newPoint, int nextPoint)
-            {
-                this.prevPoint = prevPoint;
-                this.newPoint = newPoint;
-                this.nextPoint = nextPoint;
-            }
-        }
-
-        static class NeighboursAndResult
-        {
-            int prevPoint;
-            int nextPoint;
-            AddResult result;
-
-            NeighboursAndResult(int prevPoint, int nextPoint, AddResult result)
-            {
-                this.prevPoint = prevPoint;
-                this.nextPoint = nextPoint;
-                this.result = result;
-            }
-        }
-
         @Override
         public int hashCode()
         {
@@ -506,38 +402,42 @@ public class StreamingTombstoneHistogramBuilder
         }
     }
 
-    public enum AddResult
-    {
-        INSERTED,
-        ACCUMULATED
-    }
-
+    /**
+     * This class is a specialized open addressing HashMap that uses int as keys and int as values.
+     * This is an optimization to avoid allocating objects.
+     * In order for this class to work correctly it should have a power of 2 capacity.
+     * This last invariant is taken care of during construction.
+     */
     static class Spool
     {
-        // odd elements - points, even elements - values
-        final int[] map;
+        final int[] points;
+        final int[] values;
+
         final int capacity;
         int size;
 
-        Spool(int capacity)
+        Spool(int requestedCapacity)
         {
-            this.capacity = capacity;
-            if (capacity == 0)
-            {
-                map = new int[0];
-            }
-            else
-            {
-                assert IntMath.isPowerOfTwo(capacity) : "should be power of two";
-                // x2 because we want to save points and values in consecutive cells and x2 because we want reprobing less that two when _capacity_ values will be written
-                map = new int[capacity * 2 * 2];
-                clear();
-            }
+            if (requestedCapacity < 0)
+                throw new IllegalArgumentException("Illegal capacity " + requestedCapacity);
+
+            this.capacity = getPowerOfTwoCapacity(requestedCapacity);
+
+            // x2 because we want no more than two reprobes on average when _capacity_ entries will be written
+            points = new int[capacity * 2];
+            values = new int[capacity * 2];
+            clear();
+        }
+
+        private int getPowerOfTwoCapacity(int requestedCapacity)
+        {
+            //for spool we need power-of-two cells
+            return requestedCapacity == 0 ? 0 : IntMath.pow(2, IntMath.log2(requestedCapacity, RoundingMode.CEILING));
         }
 
         void clear()
         {
-            Arrays.fill(map, -1);
+            Arrays.fill(points, -1);
             size = 0;
         }
 
@@ -548,12 +448,12 @@ public class StreamingTombstoneHistogramBuilder
                 return false;
             }
 
-            final int cell = 2 * ((capacity - 1) & hash(point));
+            final int cell = (capacity - 1) & hash(point);
 
             // We use linear scanning. I think cluster of 100 elements is large enough to give up.
             for (int attempt = 0; attempt < 100; attempt++)
             {
-                if (tryCell(cell + attempt * 2, point, delta))
+                if (tryCell(cell + attempt, point, delta))
                     return true;
             }
             return false;
@@ -567,40 +467,75 @@ public class StreamingTombstoneHistogramBuilder
 
         <E extends Exception> void forEach(HistogramDataConsumer<E> consumer) throws E
         {
-            for (int i = 0; i < map.length; i += 2)
+            for (int i = 0; i < points.length; i++)
             {
-                if (map[i] != -1)
+                if (points[i] != -1)
                 {
-                    consumer.consume(map[i], map[i + 1]);
+                    consumer.consume(points[i], values[i]);
                 }
             }
         }
 
         private boolean tryCell(int cell, int point, int delta)
         {
-            cell = cell % map.length;
-            if (map[cell] == -1)
+            assert cell >= 0 && point >= 0 && delta >= 0 : "Invalid arguments: cell:" + cell + " point:" + point + " delta:" + delta;
+
+            cell = cell % points.length;
+            if (points[cell] == -1)
             {
-                map[cell] = point;
-                map[cell + 1] = delta;
+                points[cell] = point;
+                values[cell] = delta;
                 size++;
                 return true;
             }
-            if (map[cell] == point)
+            if (points[cell] == point)
             {
-                map[cell + 1] += delta;
+                values[cell] = saturatingCastToInt((long) values[cell] + (long) delta);
                 return true;
             }
             return false;
         }
+
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append('[');
+            for (int i = 0; i < points.length; i++)
+            {
+                if (points[i] == -1)
+                    continue;
+                if (sb.length() > 1)
+                    sb.append(", ");
+                sb.append('[').append(points[i]).append(',').append(values[i]).append(']');
+            }
+            sb.append(']');
+            return sb.toString();
+        }
     }
 
-    private static int roundKey(int p, int roundSeconds)
+    private static int ceilKey(int point, int bucketSize)
     {
-        int d = p % roundSeconds;
-        if (d > 0)
-            return p + (roundSeconds - d);
-        else
-            return p;
+        int delta = point % bucketSize;
+
+        if (delta == 0)
+            return point;
+
+        return saturatingCastToMaxDeletionTime((long) point + (long) bucketSize - (long) delta);
+    }
+
+    public static int saturatingCastToInt(long value)
+    {
+        return (int) (value > Integer.MAX_VALUE ? Integer.MAX_VALUE : value);
+    }
+
+    /**
+     * Cast to an int with maximum value of {@code Cell.MAX_DELETION_TIME} to avoid representing values that
+     * aren't a tombstone
+     */
+    public static int saturatingCastToMaxDeletionTime(long value)
+    {
+        return (value < 0L || value > Cell.MAX_DELETION_TIME)
+               ? Cell.MAX_DELETION_TIME
+               : (int) value;
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java b/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
index 19bdd27..5f2787b 100755
--- a/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
+++ b/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
@@ -91,7 +91,12 @@ public class TombstoneHistogram
             DataHolder dataHolder = new DataHolder(size, 1);
             for (int i = 0; i < size; i++)
             {
-                dataHolder.addValue((int)in.readDouble(), (int)in.readLong());
+                // Already serialized sstable metadata may contain negative deletion-time values (see CASSANDRA-14092).
+                // Just do a "safe cast" and it should be good. For safety, also do that for the 'value' (tombstone count).
+                int localDeletionTime = StreamingTombstoneHistogramBuilder.saturatingCastToMaxDeletionTime((long) in.readDouble());
+                int count = StreamingTombstoneHistogramBuilder.saturatingCastToInt(in.readLong());
+
+                dataHolder.addValue(localDeletionTime, count);
             }
 
             return new TombstoneHistogram(dataHolder);
diff --git a/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java b/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
index c4da5cb..596c4d7 100755
--- a/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
+++ b/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
@@ -20,15 +20,26 @@ package org.apache.cassandra.utils.streamhist;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.IntStream;
 
 import org.junit.Test;
 
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.psjava.util.AssertStatus;
+import org.quicktheories.core.Gen;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.integers;
+import static org.quicktheories.generators.SourceDSL.lists;
 
 public class StreamingTombstoneHistogramBuilderTest
 {
@@ -111,11 +122,12 @@ public class StreamingTombstoneHistogramBuilderTest
         builder.update(2);
         builder.update(2);
         builder.update(2);
+        builder.update(2, Integer.MAX_VALUE); // To check that value overflow is handled correctly
         TombstoneHistogram hist = builder.build();
         Map<Integer, Integer> asMap = asMap(hist);
 
         assertEquals(1, asMap.size());
-        assertEquals(3, asMap.get(2).intValue());
+        assertEquals(Integer.MAX_VALUE, asMap.get(2).intValue());
 
         //Make sure it's working with Serde
         DataOutputBuffer out = new DataOutputBuffer();
@@ -126,7 +138,7 @@ public class StreamingTombstoneHistogramBuilderTest
 
         asMap = asMap(deserialized);
         assertEquals(1, deserialized.size());
-        assertEquals(3, asMap.get(2).intValue());
+        assertEquals(Integer.MAX_VALUE, asMap.get(2).intValue());
     }
 
     @Test
@@ -167,10 +179,226 @@ public class StreamingTombstoneHistogramBuilderTest
         IntStream.range(Integer.MAX_VALUE - 30, Integer.MAX_VALUE).forEach(builder::update);
     }
 
+    @Test
+    public void testLargeDeletionTimesAndLargeValuesDontCauseOverflow()
+    {
+        qt().forAll(streamingTombstoneHistogramBuilderGen(1000, 300000, 60),
+                    lists().of(integers().from(0).upTo(Cell.MAX_DELETION_TIME)).ofSize(300),
+                    lists().of(integers().allPositive()).ofSize(300))
+            .checkAssert(this::updateHistogramAndCheckAllBucketsArePositive);
+    }
+
+    private void updateHistogramAndCheckAllBucketsArePositive(StreamingTombstoneHistogramBuilder histogramBuilder, List<Integer> keys, List<Integer> values)
+    {
+        for (int i = 0; i < keys.size(); i++)
+        {
+            histogramBuilder.update(keys.get(i), values.get(i));
+        }
+
+        TombstoneHistogram histogram = histogramBuilder.build();
+        for (Map.Entry<Integer, Integer> buckets : asMap(histogram).entrySet())
+        {
+            assertTrue("Invalid bucket key", buckets.getKey() >= 0);
+            assertTrue("Invalid bucket value", buckets.getValue() >= 0);
+        }
+    }
+
+    @Test
+    public void testThatPointIsNotMissedBecauseOfRoundingToNoDeletionTime() throws Exception
+    {
+        int pointThatRoundedToNoDeletion = Cell.NO_DELETION_TIME - 2;
+        assert pointThatRoundedToNoDeletion + pointThatRoundedToNoDeletion % 3 == Cell.NO_DELETION_TIME : "test data should be valid";
+
+        StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 10, 3);
+        builder.update(pointThatRoundedToNoDeletion);
+
+        TombstoneHistogram histogram = builder.build();
+
+        Map<Integer, Integer> integerIntegerMap = asMap(histogram);
+        assertEquals(integerIntegerMap.size(), 1);
+        assertEquals(integerIntegerMap.get(Cell.MAX_DELETION_TIME).intValue(), 1);
+    }
+
+    @Test
+    public void testInvalidArguments()
+    {
+        assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, 10, 0)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:10 delta:0");
+        assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, 10, -1)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:10 delta:-1");
+        assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, -1, 60)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:-1 delta:60");
+        assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(-1, 10, 60)).hasMessage("Invalid arguments: maxBinSize:-1 maxSpoolSize:10 delta:60");
+        assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(0, 10, 60)).hasMessage("Invalid arguments: maxBinSize:0 maxSpoolSize:10 delta:60");
+    }
+
+    @Test
+    public void testSpool()
+    {
+        StreamingTombstoneHistogramBuilder.Spool spool = new StreamingTombstoneHistogramBuilder.Spool(8);
+        assertTrue(spool.tryAddOrAccumulate(5, 1));
+        assertSpool(spool, 5, 1);
+        assertTrue(spool.tryAddOrAccumulate(5, 3));
+        assertSpool(spool, 5, 4);
+
+        assertTrue(spool.tryAddOrAccumulate(10, 1));
+        assertSpool(spool, 5, 4,
+                    10, 1);
+
+        assertTrue(spool.tryAddOrAccumulate(12, 1));
+        assertTrue(spool.tryAddOrAccumulate(14, 1));
+        assertTrue(spool.tryAddOrAccumulate(16, 1));
+        assertSpool(spool, 5, 4,
+                    10, 1,
+                    12, 1,
+                    14, 1,
+                    16, 1);
+
+        assertTrue(spool.tryAddOrAccumulate(18, 1));
+        assertTrue(spool.tryAddOrAccumulate(20, 1));
+        assertTrue(spool.tryAddOrAccumulate(30, 1));
+        assertSpool(spool, 5, 4,
+                    10, 1,
+                    12, 1,
+                    14, 1,
+                    16, 1,
+                    18, 1,
+                    20, 1,
+                    30, 1);
+
+        assertTrue(spool.tryAddOrAccumulate(16, 5));
+        assertTrue(spool.tryAddOrAccumulate(12, 4));
+        assertTrue(spool.tryAddOrAccumulate(18, 9));
+        assertSpool(spool,
+                    5, 4,
+                    10, 1,
+                    12, 5,
+                    14, 1,
+                    16, 6,
+                    18, 10,
+                    20, 1,
+                    30, 1);
+
+        assertTrue(spool.tryAddOrAccumulate(99, 5));
+    }
+
+    @Test
+    public void testDataHolder()
+    {
+        StreamingTombstoneHistogramBuilder.DataHolder dataHolder = new StreamingTombstoneHistogramBuilder.DataHolder(4, 1);
+        assertFalse(dataHolder.isFull());
+        assertEquals(0, dataHolder.size());
+
+        assertTrue(dataHolder.addValue(4, 1));
+        assertDataHolder(dataHolder,
+                         4, 1);
+
+        assertFalse(dataHolder.addValue(4, 1));
+        assertDataHolder(dataHolder,
+                         4, 2);
+
+        assertTrue(dataHolder.addValue(7, 1));
+        assertDataHolder(dataHolder,
+                         4, 2,
+                         7, 1);
+
+        assertFalse(dataHolder.addValue(7, 1));
+        assertDataHolder(dataHolder,
+                         4, 2,
+                         7, 2);
+
+        assertTrue(dataHolder.addValue(5, 1));
+        assertDataHolder(dataHolder,
+                         4, 2,
+                         5, 1,
+                         7, 2);
+
+        assertFalse(dataHolder.addValue(5, 1));
+        assertDataHolder(dataHolder,
+                         4, 2,
+                         5, 2,
+                         7, 2);
+
+        assertTrue(dataHolder.addValue(2, 1));
+        assertDataHolder(dataHolder,
+                         2, 1,
+                         4, 2,
+                         5, 2,
+                         7, 2);
+        assertTrue(dataHolder.isFull());
+
+        // expect to merge [4,2]+[5,2]
+        dataHolder.mergeNearestPoints();
+        assertDataHolder(dataHolder,
+                         2, 1,
+                         4, 4,
+                         7, 2);
+
+        assertFalse(dataHolder.addValue(2, 1));
+        assertDataHolder(dataHolder,
+                         2, 2,
+                         4, 4,
+                         7, 2);
+
+        dataHolder.addValue(8, 1);
+        assertDataHolder(dataHolder,
+                         2, 2,
+                         4, 4,
+                         7, 2,
+                         8, 1);
+        assertTrue(dataHolder.isFull());
+
+        // expect to merge [7,2]+[8,1]
+        dataHolder.mergeNearestPoints();
+        assertDataHolder(dataHolder,
+                         2, 2,
+                         4, 4,
+                         7, 3);
+    }
+
+    private static void assertDataHolder(StreamingTombstoneHistogramBuilder.DataHolder dataHolder, int... pointValue)
+    {
+        assertEquals(pointValue.length / 2, dataHolder.size());
+
+        for (int i = 0; i < pointValue.length; i += 2)
+        {
+            int point = pointValue[i];
+            int expectedValue = pointValue[i + 1];
+            assertEquals(expectedValue, dataHolder.getValue(point));
+        }
+    }
+
+    /**
+     * Compare the contents of {@code spool} with the given collection of key-value pairs in {@code pairs}.
+     */
+    private static void assertSpool(StreamingTombstoneHistogramBuilder.Spool spool, int... pairs)
+    {
+        assertEquals(pairs.length / 2, spool.size);
+        Map<Integer, Integer> tests = new HashMap<>();
+        for (int i = 0; i < pairs.length; i += 2)
+            tests.put(pairs[i], pairs[i + 1]);
+
+        spool.forEach((k, v) -> {
+            Integer x = tests.remove(k);
+            assertNotNull("key " + k, x);
+            assertEquals(x.intValue(), v);
+        });
+        AssertStatus.assertTrue(tests.isEmpty());
+    }
+
     private Map<Integer, Integer> asMap(TombstoneHistogram histogram)
     {
         Map<Integer, Integer> result = new HashMap<>();
         histogram.forEach(result::put);
         return result;
     }
+
+    private Gen<StreamingTombstoneHistogramBuilder> streamingTombstoneHistogramBuilderGen(int maxBinSize, int maxSpoolSize, int maxRoundSeconds)
+    {
+        return positiveIntegerUpTo(maxBinSize).zip(integers().between(0, maxSpoolSize),
+                                                   positiveIntegerUpTo(maxRoundSeconds),
+                                                   StreamingTombstoneHistogramBuilder::new);
+    }
+
+    private Gen<Integer> positiveIntegerUpTo(int upperBound)
+    {
+        return integers().between(1, upperBound);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org