You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/04/09 13:21:00 UTC
[cassandra] branch trunk updated: Fix overflows on
StreamingTombstoneHistogramBuilder produced by large deletion times.
This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 00fb6d7 Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times.
00fb6d7 is described below
commit 00fb6d76d0a97af06ba27c1180d6dcddfa337fea
Author: Francisco Fernandez Castano <fr...@gmail.com>
AuthorDate: Wed Mar 25 12:15:20 2020 +0100
Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times.
patch by Francisco Fernandez; reviewed by Benjamin Lerer and Robert Stupp
for CASSANDRA-14773
This patch:
* prevents int32-bit integer overflow
* simplifies the underlying structures of StreamingTombstoneHistogramBuilder
* avoid humongous allocations by maintaining separate arrays for tombstone timestamps and number of
tombstone occurrences (these two were kept in the same array before)
* introduces more test coverage.
---
CHANGES.txt | 1 +
.../StreamingTombstoneHistogramBuilder.java | 437 +++++++++------------
.../utils/streamhist/TombstoneHistogram.java | 7 +-
.../StreamingTombstoneHistogramBuilderTest.java | 232 ++++++++++-
4 files changed, 423 insertions(+), 254 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 86a8813..46790c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773)
* Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706)
* Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390)
* Repair history tables should have TTL and TWCS (CASSANDRA-12701)
diff --git a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
index 9856253..eda88bc 100755
--- a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
+++ b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
@@ -22,28 +22,32 @@ import java.math.RoundingMode;
import java.util.Arrays;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.math.IntMath;
-import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.AddResult.ACCUMULATED;
-import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.AddResult.INSERTED;
+import org.apache.cassandra.db.rows.Cell;
/**
* Histogram that can be constructed from streaming of data.
+ *
+ * Histogram used to retrieve the number of droppable tombstones for example via
+ * {@link org.apache.cassandra.io.sstable.format.SSTableReader#getDroppableTombstonesBefore(int)}.
* <p>
- * The original algorithm is taken from following paper:
- * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010)
- * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
+ * When an sstable is written (or streamed), this histogram-builder receives the "local deletion timestamp"
+ * as an {@code int} via {@link #update(int)}. Negative values are not supported.
* <p>
* Algorithm: Histogram is represented as collection of {point, weight} pairs. When new point <i>p</i> with weight <i>m</i> is added:
* <ol>
- * <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li>
- * <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li>
- * <li>If point was added and collection size became lorger than maxBinSize:</li>
- * <ol type="a">
- * <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li>
- * <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
+ * <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li>
+ * <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li>
+ * <li>If point was added and collection size became lorger than maxBinSize:</li>
* </ol>
+ *
+ * <ol type="a">
+ * <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li>
+ * <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
* </ol>
+ *
* <p>
* There are some optimization to make histogram builder faster:
* <ol>
@@ -51,19 +55,19 @@ import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramB
* For example, if spoolSize=100, binSize=10 and there are only 50 different points. it will be only 40 merges regardless how many points will be added.</li>
* <li>Spool is organized as open-addressing primitive hash map where odd elements are points and event elements are values.
* Spool can not resize => when number of collisions became bigger than threashold or size became large that <i>array_size/2</i> Spool is drained to bin</li>
- * <li>DistanceHolder - sorted collection of distances between points in Bin. It is used to find nearest points in constant time</li>
- * <li>Distances and Bin organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li>
- * <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin and <i></>{distance, left_point}</i> pairs is packed in one long</li>
+ * <li>Bin is organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li>
+ * <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin pairs is packed in one long</li>
* </ol>
+ * <p>
+ * The original algorithm is taken from following paper:
+ * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010)
+ * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
*/
public class StreamingTombstoneHistogramBuilder
{
// Buffer with point-value pair
private final DataHolder bin;
- // Buffer with distance between points, sorted from nearest to furthest
- private final DistanceHolder distances;
-
// Keep a second, larger buffer to spool data in, before finalizing it into `bin`
private final Spool spool;
@@ -72,47 +76,42 @@ public class StreamingTombstoneHistogramBuilder
public StreamingTombstoneHistogramBuilder(int maxBinSize, int maxSpoolSize, int roundSeconds)
{
+ assert maxBinSize > 0 && maxSpoolSize >= 0 && roundSeconds > 0: "Invalid arguments: maxBinSize:" + maxBinSize + " maxSpoolSize:" + maxSpoolSize + " delta:" + roundSeconds;
+
this.roundSeconds = roundSeconds;
this.bin = new DataHolder(maxBinSize + 1, roundSeconds);
- distances = new DistanceHolder(maxBinSize);
-
- //for spool we need power-of-two cells
- maxSpoolSize = maxSpoolSize == 0 ? 0 : IntMath.pow(2, IntMath.log2(maxSpoolSize, RoundingMode.CEILING));
- spool = new Spool(maxSpoolSize);
+ this.spool = new Spool(maxSpoolSize);
}
/**
- * Adds new point p to this histogram.
+ * Adds new point to this histogram with a default value of 1.
*
- * @param p
+ * @param point the point to be added
*/
- public void update(int p)
+ public void update(int point)
{
- update(p, 1);
+ update(point, 1);
}
/**
- * Adds new point p with value m to this histogram.
- *
- * @param p
- * @param m
+ * Adds new point {@param point} with value {@param value} to this histogram.
*/
- public void update(int p, int m)
+ public void update(int point, int value)
{
- p = roundKey(p, roundSeconds);
+ point = ceilKey(point, roundSeconds);
if (spool.capacity > 0)
{
- if (!spool.tryAddOrAccumulate(p, m))
+ if (!spool.tryAddOrAccumulate(point, value))
{
flushHistogram();
- final boolean success = spool.tryAddOrAccumulate(p, m);
+ final boolean success = spool.tryAddOrAccumulate(point, value);
assert success : "Can not add value to spool"; // after spool flushing we should always be able to insert new value
}
}
else
{
- flushValue(p, m);
+ flushValue(point, value);
}
}
@@ -127,50 +126,11 @@ public class StreamingTombstoneHistogramBuilder
private void flushValue(int key, int spoolValue)
{
- DataHolder.NeighboursAndResult addResult = bin.addValue(key, spoolValue);
- if (addResult.result == INSERTED)
- {
- final int prevPoint = addResult.prevPoint;
- final int nextPoint = addResult.nextPoint;
- if (prevPoint != -1 && nextPoint != -1)
- distances.remove(prevPoint, nextPoint);
- if (prevPoint != -1)
- distances.add(prevPoint, key);
- if (nextPoint != -1)
- distances.add(key, nextPoint);
- }
+ bin.addValue(key, spoolValue);
if (bin.isFull())
{
- mergeBin();
- }
- }
-
- private void mergeBin()
- {
- // find points point1, point2 which have smallest difference
- final int[] smallestDifference = distances.getFirstAndRemove();
-
- final int point1 = smallestDifference[0];
- final int point2 = smallestDifference[1];
-
- // merge those two
- DataHolder.MergeResult mergeResult = bin.merge(point1, point2);
-
- final int nextPoint = mergeResult.nextPoint;
- final int prevPoint = mergeResult.prevPoint;
- final int newPoint = mergeResult.newPoint;
-
- if (nextPoint != -1)
- {
- distances.remove(point2, nextPoint);
- distances.add(newPoint, nextPoint);
- }
-
- if (prevPoint != -1)
- {
- distances.remove(prevPoint, point1);
- distances.add(prevPoint, newPoint);
+ bin.mergeNearestPoints();
}
}
@@ -185,72 +145,10 @@ public class StreamingTombstoneHistogramBuilder
return new TombstoneHistogram(bin);
}
- private static class DistanceHolder
- {
- private static final long EMPTY = Long.MAX_VALUE;
- private final long[] data;
-
- DistanceHolder(int maxCapacity)
- {
- data = new long[maxCapacity];
- Arrays.fill(data, EMPTY);
- }
-
- void add(int prev, int next)
- {
- long key = getKey(prev, next);
- int index = Arrays.binarySearch(data, key);
-
- assert (index < 0) : "Element already exists";
- assert (data[data.length - 1] == EMPTY) : "No more space in array";
-
- index = -index - 1;
- System.arraycopy(data, index, data, index + 1, data.length - index - 1);
- data[index] = key;
- }
-
- void remove(int prev, int next)
- {
- long key = getKey(prev, next);
- int index = Arrays.binarySearch(data, key);
- if (index >= 0)
- {
- if (index < data.length)
- System.arraycopy(data, index + 1, data, index, data.length - index - 1);
- data[data.length - 1] = EMPTY;
- }
- }
-
- int[] getFirstAndRemove()
- {
- if (data[0] == EMPTY)
- return null;
-
- int[] result = unwrapKey(data[0]);
- System.arraycopy(data, 1, data, 0, data.length - 1);
- data[data.length - 1] = EMPTY;
- return result;
- }
-
- private int[] unwrapKey(long key)
- {
- final int distance = (int) (key >> 32);
- final int prev = (int) (key & 0xFF_FF_FF_FFL);
- return new int[]{ prev, prev + distance };
- }
-
- private long getKey(int prev, int next)
- {
- long distance = next - prev;
- return (distance << 32) | prev;
- }
-
- public String toString()
- {
- return Arrays.stream(data).filter(x -> x != EMPTY).boxed().map(this::unwrapKey).map(Arrays::toString).collect(Collectors.joining());
- }
- }
-
+ /**
+ * An ordered collection of histogram buckets, each entry in the collection represents a pair (bucket, count).
+ * Once the collection is full it merges the closest buckets using a weighted approach see {@link #mergeNearestPoints()}.
+ */
static class DataHolder
{
private static final long EMPTY = Long.MAX_VALUE;
@@ -270,11 +168,29 @@ public class StreamingTombstoneHistogramBuilder
roundSeconds = holder.roundSeconds;
}
- NeighboursAndResult addValue(int point, int delta)
+ @VisibleForTesting
+ int getValue(int point)
+ {
+ long key = wrap(point, 0);
+ int index = Arrays.binarySearch(data, key);
+ if (index < 0)
+ index = -index - 1;
+ if (index >= data.length)
+ return -1; // not-found sentinel
+ if (unwrapPoint(data[index]) != point)
+ return -2; // not-found sentinel
+ return unwrapValue(data[index]);
+ }
+
+ /**
+ * Adds value {@code delta} to the point {@code point}.
+ *
+ * @return {@code true} if inserted, {@code false} if accumulated
+ */
+ boolean addValue(int point, int delta)
{
long key = wrap(point, 0);
int index = Arrays.binarySearch(data, key);
- AddResult addResult;
if (index < 0)
{
index = -index - 1;
@@ -287,25 +203,35 @@ public class StreamingTombstoneHistogramBuilder
System.arraycopy(data, index, data, index + 1, data.length - index - 1);
data[index] = wrap(point, delta);
- addResult = INSERTED;
+ return true;
}
else
{
- data[index] += delta;
- addResult = ACCUMULATED;
+ data[index] = wrap(point, (long) unwrapValue(data[index]) + delta);
}
}
else
{
- data[index] += delta;
- addResult = ACCUMULATED;
+ data[index] = wrap(point, (long) unwrapValue(data[index]) + delta);
}
- return new NeighboursAndResult(getPrevPoint(index), getNextPoint(index), addResult);
+ return false;
}
- public MergeResult merge(int point1, int point2)
+ /**
+ * Finds nearest points <i>p1</i> and <i>p2</i> in the collection
+ * Replaces theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)
+ */
+ @VisibleForTesting
+ void mergeNearestPoints()
{
+ assert isFull() : "DataHolder must be full in order to merge two points";
+
+ final int[] smallestDifference = findPointPairWithSmallestDistance();
+
+ final int point1 = smallestDifference[0];
+ final int point2 = smallestDifference[1];
+
long key = wrap(point1, 0);
int index = Arrays.binarySearch(data, key);
if (index < 0)
@@ -315,47 +241,44 @@ public class StreamingTombstoneHistogramBuilder
assert (unwrapPoint(data[index]) == point1) : "Not found in array";
}
- final int prevPoint = getPrevPoint(index);
- final int nextPoint = getNextPoint(index + 1);
-
- int value1 = unwrapValue(data[index]);
- int value2 = unwrapValue(data[index + 1]);
+ long value1 = unwrapValue(data[index]);
+ long value2 = unwrapValue(data[index + 1]);
assert (unwrapPoint(data[index + 1]) == point2) : "point2 should follow point1";
- int sum = value1 + value2;
+ long sum = value1 + value2;
//let's evaluate in long values to handle overflow in multiplication
- int newPoint = (int) (((long) point1 * value1 + (long) point2 * value2) / (value1 + value2));
- newPoint = roundKey(newPoint, roundSeconds);
- data[index] = wrap(newPoint, sum);
+ int newPoint = saturatingCastToInt((point1 * value1 + point2 * value2) / sum);
+ newPoint = ceilKey(newPoint, roundSeconds);
+ data[index] = wrap(newPoint, saturatingCastToInt(sum));
System.arraycopy(data, index + 2, data, index + 1, data.length - index - 2);
data[data.length - 1] = EMPTY;
-
- return new MergeResult(prevPoint, newPoint, nextPoint);
}
- private int getPrevPoint(int index)
+ private int[] findPointPairWithSmallestDistance()
{
- if (index > 0)
- if (data[index - 1] != EMPTY)
- return (int) (data[index - 1] >> 32);
- else
- return -1;
- else
- return -1;
- }
+ assert isFull(): "The DataHolder must be full in order to find the closest pair of points";
- private int getNextPoint(int index)
- {
- if (index < data.length - 1)
- if (data[index + 1] != EMPTY)
- return (int) (data[index + 1] >> 32);
- else
- return -1;
- else
- return -1;
+ int point1 = 0;
+ int point2 = Integer.MAX_VALUE;
+
+ for (int i = 0; i < data.length - 1; i++)
+ {
+ int pointA = unwrapPoint(data[i]);
+ int pointB = unwrapPoint(data[i + 1]);
+
+ assert pointB > pointA : "DataHolder not sorted, p2(" + pointB +") < p1(" + pointA + ") for " + this;
+
+ if (point2 - point1 > pointB - pointA)
+ {
+ point1 = pointA;
+ point2 = pointB;
+ }
+ }
+
+ return new int[]{point1, point2};
}
private int[] unwrap(long key)
@@ -375,15 +298,15 @@ public class StreamingTombstoneHistogramBuilder
return (int) (key & 0xFF_FF_FF_FFL);
}
- private long wrap(int point, int value)
+ private long wrap(int point, long value)
{
- return (((long) point) << 32) | value;
+ assert point >= 0 : "Invalid argument: point:" + point;
+ return (((long) point) << 32) | saturatingCastToInt(value);
}
-
public String toString()
{
- return Arrays.stream(data).filter(x -> x != EMPTY).boxed().map(this::unwrap).map(Arrays::toString).collect(Collectors.joining());
+ return Arrays.stream(data).filter(x -> x != EMPTY).mapToObj(this::unwrap).map(Arrays::toString).collect(Collectors.joining());
}
public boolean isFull()
@@ -434,6 +357,7 @@ public class StreamingTombstoneHistogramBuilder
{
final int prevPoint = unwrapPoint(data[i - 1]);
final int prevValue = unwrapValue(data[i - 1]);
+ // calculate estimated count mb for point b
double weight = (b - prevPoint) / (double) (point - prevPoint);
double mb = prevValue + (value - prevValue) * weight;
sum -= prevValue;
@@ -450,34 +374,6 @@ public class StreamingTombstoneHistogramBuilder
return sum;
}
- static class MergeResult
- {
- int prevPoint;
- int newPoint;
- int nextPoint;
-
- MergeResult(int prevPoint, int newPoint, int nextPoint)
- {
- this.prevPoint = prevPoint;
- this.newPoint = newPoint;
- this.nextPoint = nextPoint;
- }
- }
-
- static class NeighboursAndResult
- {
- int prevPoint;
- int nextPoint;
- AddResult result;
-
- NeighboursAndResult(int prevPoint, int nextPoint, AddResult result)
- {
- this.prevPoint = prevPoint;
- this.nextPoint = nextPoint;
- this.result = result;
- }
- }
-
@Override
public int hashCode()
{
@@ -506,38 +402,42 @@ public class StreamingTombstoneHistogramBuilder
}
}
- public enum AddResult
- {
- INSERTED,
- ACCUMULATED
- }
-
+ /**
+ * This class is a specialized open addressing HashMap that uses int as keys and int as values.
+ * This is an optimization to avoid allocating objects.
+ * In order for this class to work correctly it should have a power of 2 capacity.
+ * This last invariant is taken care of during construction.
+ */
static class Spool
{
- // odd elements - points, even elements - values
- final int[] map;
+ final int[] points;
+ final int[] values;
+
final int capacity;
int size;
- Spool(int capacity)
+ Spool(int requestedCapacity)
{
- this.capacity = capacity;
- if (capacity == 0)
- {
- map = new int[0];
- }
- else
- {
- assert IntMath.isPowerOfTwo(capacity) : "should be power of two";
- // x2 because we want to save points and values in consecutive cells and x2 because we want reprobing less that two when _capacity_ values will be written
- map = new int[capacity * 2 * 2];
- clear();
- }
+ if (requestedCapacity < 0)
+ throw new IllegalArgumentException("Illegal capacity " + requestedCapacity);
+
+ this.capacity = getPowerOfTwoCapacity(requestedCapacity);
+
+ // x2 because we want no more than two reprobes on average when _capacity_ entries will be written
+ points = new int[capacity * 2];
+ values = new int[capacity * 2];
+ clear();
+ }
+
+ private int getPowerOfTwoCapacity(int requestedCapacity)
+ {
+ //for spool we need power-of-two cells
+ return requestedCapacity == 0 ? 0 : IntMath.pow(2, IntMath.log2(requestedCapacity, RoundingMode.CEILING));
}
void clear()
{
- Arrays.fill(map, -1);
+ Arrays.fill(points, -1);
size = 0;
}
@@ -548,12 +448,12 @@ public class StreamingTombstoneHistogramBuilder
return false;
}
- final int cell = 2 * ((capacity - 1) & hash(point));
+ final int cell = (capacity - 1) & hash(point);
// We use linear scanning. I think cluster of 100 elements is large enough to give up.
for (int attempt = 0; attempt < 100; attempt++)
{
- if (tryCell(cell + attempt * 2, point, delta))
+ if (tryCell(cell + attempt, point, delta))
return true;
}
return false;
@@ -567,40 +467,75 @@ public class StreamingTombstoneHistogramBuilder
<E extends Exception> void forEach(HistogramDataConsumer<E> consumer) throws E
{
- for (int i = 0; i < map.length; i += 2)
+ for (int i = 0; i < points.length; i++)
{
- if (map[i] != -1)
+ if (points[i] != -1)
{
- consumer.consume(map[i], map[i + 1]);
+ consumer.consume(points[i], values[i]);
}
}
}
private boolean tryCell(int cell, int point, int delta)
{
- cell = cell % map.length;
- if (map[cell] == -1)
+ assert cell >= 0 && point >= 0 && delta >= 0 : "Invalid arguments: cell:" + cell + " point:" + point + " delta:" + delta;
+
+ cell = cell % points.length;
+ if (points[cell] == -1)
{
- map[cell] = point;
- map[cell + 1] = delta;
+ points[cell] = point;
+ values[cell] = delta;
size++;
return true;
}
- if (map[cell] == point)
+ if (points[cell] == point)
{
- map[cell + 1] += delta;
+ values[cell] = saturatingCastToInt((long) values[cell] + (long) delta);
return true;
}
return false;
}
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (int i = 0; i < points.length; i++)
+ {
+ if (points[i] == -1)
+ continue;
+ if (sb.length() > 1)
+ sb.append(", ");
+ sb.append('[').append(points[i]).append(',').append(values[i]).append(']');
+ }
+ sb.append(']');
+ return sb.toString();
+ }
}
- private static int roundKey(int p, int roundSeconds)
+ private static int ceilKey(int point, int bucketSize)
{
- int d = p % roundSeconds;
- if (d > 0)
- return p + (roundSeconds - d);
- else
- return p;
+ int delta = point % bucketSize;
+
+ if (delta == 0)
+ return point;
+
+ return saturatingCastToMaxDeletionTime((long) point + (long) bucketSize - (long) delta);
+ }
+
+ public static int saturatingCastToInt(long value)
+ {
+ return (int) (value > Integer.MAX_VALUE ? Integer.MAX_VALUE : value);
+ }
+
+ /**
+ * Cast to an int with maximum value of {@code Cell.MAX_DELETION_TIME} to avoid representing values that
+ * aren't a tombstone
+ */
+ public static int saturatingCastToMaxDeletionTime(long value)
+ {
+ return (value < 0L || value > Cell.MAX_DELETION_TIME)
+ ? Cell.MAX_DELETION_TIME
+ : (int) value;
}
}
diff --git a/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java b/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
index 19bdd27..5f2787b 100755
--- a/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
+++ b/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
@@ -91,7 +91,12 @@ public class TombstoneHistogram
DataHolder dataHolder = new DataHolder(size, 1);
for (int i = 0; i < size; i++)
{
- dataHolder.addValue((int)in.readDouble(), (int)in.readLong());
+ // Already serialized sstable metadata may contain negative deletion-time values (see CASSANDRA-14092).
+ // Just do a "safe cast" and it should be good. For safety, also do that for the 'value' (tombstone count).
+ int localDeletionTime = StreamingTombstoneHistogramBuilder.saturatingCastToMaxDeletionTime((long) in.readDouble());
+ int count = StreamingTombstoneHistogramBuilder.saturatingCastToInt(in.readLong());
+
+ dataHolder.addValue(localDeletionTime, count);
}
return new TombstoneHistogram(dataHolder);
diff --git a/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java b/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
index c4da5cb..596c4d7 100755
--- a/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
+++ b/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
@@ -20,15 +20,26 @@ package org.apache.cassandra.utils.streamhist;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.junit.Test;
+import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.psjava.util.AssertStatus;
+import org.quicktheories.core.Gen;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.integers;
+import static org.quicktheories.generators.SourceDSL.lists;
public class StreamingTombstoneHistogramBuilderTest
{
@@ -111,11 +122,12 @@ public class StreamingTombstoneHistogramBuilderTest
builder.update(2);
builder.update(2);
builder.update(2);
+ builder.update(2, Integer.MAX_VALUE); // To check that value overflow is handled correctly
TombstoneHistogram hist = builder.build();
Map<Integer, Integer> asMap = asMap(hist);
assertEquals(1, asMap.size());
- assertEquals(3, asMap.get(2).intValue());
+ assertEquals(Integer.MAX_VALUE, asMap.get(2).intValue());
//Make sure it's working with Serde
DataOutputBuffer out = new DataOutputBuffer();
@@ -126,7 +138,7 @@ public class StreamingTombstoneHistogramBuilderTest
asMap = asMap(deserialized);
assertEquals(1, deserialized.size());
- assertEquals(3, asMap.get(2).intValue());
+ assertEquals(Integer.MAX_VALUE, asMap.get(2).intValue());
}
@Test
@@ -167,10 +179,226 @@ public class StreamingTombstoneHistogramBuilderTest
IntStream.range(Integer.MAX_VALUE - 30, Integer.MAX_VALUE).forEach(builder::update);
}
+ @Test
+ public void testLargeDeletionTimesAndLargeValuesDontCauseOverflow()
+ {
+ qt().forAll(streamingTombstoneHistogramBuilderGen(1000, 300000, 60),
+ lists().of(integers().from(0).upTo(Cell.MAX_DELETION_TIME)).ofSize(300),
+ lists().of(integers().allPositive()).ofSize(300))
+ .checkAssert(this::updateHistogramAndCheckAllBucketsArePositive);
+ }
+
+ private void updateHistogramAndCheckAllBucketsArePositive(StreamingTombstoneHistogramBuilder histogramBuilder, List<Integer> keys, List<Integer> values)
+ {
+ for (int i = 0; i < keys.size(); i++)
+ {
+ histogramBuilder.update(keys.get(i), values.get(i));
+ }
+
+ TombstoneHistogram histogram = histogramBuilder.build();
+ for (Map.Entry<Integer, Integer> buckets : asMap(histogram).entrySet())
+ {
+ assertTrue("Invalid bucket key", buckets.getKey() >= 0);
+ assertTrue("Invalid bucket value", buckets.getValue() >= 0);
+ }
+ }
+
+ @Test
+ public void testThatPointIsNotMissedBecauseOfRoundingToNoDeletionTime() throws Exception
+ {
+ int pointThatRoundedToNoDeletion = Cell.NO_DELETION_TIME - 2;
+ assert pointThatRoundedToNoDeletion + pointThatRoundedToNoDeletion % 3 == Cell.NO_DELETION_TIME : "test data should be valid";
+
+ StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 10, 3);
+ builder.update(pointThatRoundedToNoDeletion);
+
+ TombstoneHistogram histogram = builder.build();
+
+ Map<Integer, Integer> integerIntegerMap = asMap(histogram);
+ assertEquals(integerIntegerMap.size(), 1);
+ assertEquals(integerIntegerMap.get(Cell.MAX_DELETION_TIME).intValue(), 1);
+ }
+
+ @Test
+ public void testInvalidArguments()
+ {
+ assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, 10, 0)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:10 delta:0");
+ assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, 10, -1)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:10 delta:-1");
+ assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, -1, 60)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:-1 delta:60");
+ assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(-1, 10, 60)).hasMessage("Invalid arguments: maxBinSize:-1 maxSpoolSize:10 delta:60");
+ assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(0, 10, 60)).hasMessage("Invalid arguments: maxBinSize:0 maxSpoolSize:10 delta:60");
+ }
+
+ @Test
+ public void testSpool()
+ {
+ StreamingTombstoneHistogramBuilder.Spool spool = new StreamingTombstoneHistogramBuilder.Spool(8);
+ assertTrue(spool.tryAddOrAccumulate(5, 1));
+ assertSpool(spool, 5, 1);
+ assertTrue(spool.tryAddOrAccumulate(5, 3));
+ assertSpool(spool, 5, 4);
+
+ assertTrue(spool.tryAddOrAccumulate(10, 1));
+ assertSpool(spool, 5, 4,
+ 10, 1);
+
+ assertTrue(spool.tryAddOrAccumulate(12, 1));
+ assertTrue(spool.tryAddOrAccumulate(14, 1));
+ assertTrue(spool.tryAddOrAccumulate(16, 1));
+ assertSpool(spool, 5, 4,
+ 10, 1,
+ 12, 1,
+ 14, 1,
+ 16, 1);
+
+ assertTrue(spool.tryAddOrAccumulate(18, 1));
+ assertTrue(spool.tryAddOrAccumulate(20, 1));
+ assertTrue(spool.tryAddOrAccumulate(30, 1));
+ assertSpool(spool, 5, 4,
+ 10, 1,
+ 12, 1,
+ 14, 1,
+ 16, 1,
+ 18, 1,
+ 20, 1,
+ 30, 1);
+
+ assertTrue(spool.tryAddOrAccumulate(16, 5));
+ assertTrue(spool.tryAddOrAccumulate(12, 4));
+ assertTrue(spool.tryAddOrAccumulate(18, 9));
+ assertSpool(spool,
+ 5, 4,
+ 10, 1,
+ 12, 5,
+ 14, 1,
+ 16, 6,
+ 18, 10,
+ 20, 1,
+ 30, 1);
+
+ assertTrue(spool.tryAddOrAccumulate(99, 5));
+ }
+
+ @Test
+ public void testDataHolder()
+ {
+ StreamingTombstoneHistogramBuilder.DataHolder dataHolder = new StreamingTombstoneHistogramBuilder.DataHolder(4, 1);
+ assertFalse(dataHolder.isFull());
+ assertEquals(0, dataHolder.size());
+
+ assertTrue(dataHolder.addValue(4, 1));
+ assertDataHolder(dataHolder,
+ 4, 1);
+
+ assertFalse(dataHolder.addValue(4, 1));
+ assertDataHolder(dataHolder,
+ 4, 2);
+
+ assertTrue(dataHolder.addValue(7, 1));
+ assertDataHolder(dataHolder,
+ 4, 2,
+ 7, 1);
+
+ assertFalse(dataHolder.addValue(7, 1));
+ assertDataHolder(dataHolder,
+ 4, 2,
+ 7, 2);
+
+ assertTrue(dataHolder.addValue(5, 1));
+ assertDataHolder(dataHolder,
+ 4, 2,
+ 5, 1,
+ 7, 2);
+
+ assertFalse(dataHolder.addValue(5, 1));
+ assertDataHolder(dataHolder,
+ 4, 2,
+ 5, 2,
+ 7, 2);
+
+ assertTrue(dataHolder.addValue(2, 1));
+ assertDataHolder(dataHolder,
+ 2, 1,
+ 4, 2,
+ 5, 2,
+ 7, 2);
+ assertTrue(dataHolder.isFull());
+
+ // expect to merge [4,2]+[5,2]
+ dataHolder.mergeNearestPoints();
+ assertDataHolder(dataHolder,
+ 2, 1,
+ 4, 4,
+ 7, 2);
+
+ assertFalse(dataHolder.addValue(2, 1));
+ assertDataHolder(dataHolder,
+ 2, 2,
+ 4, 4,
+ 7, 2);
+
+ dataHolder.addValue(8, 1);
+ assertDataHolder(dataHolder,
+ 2, 2,
+ 4, 4,
+ 7, 2,
+ 8, 1);
+ assertTrue(dataHolder.isFull());
+
+ // expect to merge [7,2]+[8,1]
+ dataHolder.mergeNearestPoints();
+ assertDataHolder(dataHolder,
+ 2, 2,
+ 4, 4,
+ 7, 3);
+ }
+
+ private static void assertDataHolder(StreamingTombstoneHistogramBuilder.DataHolder dataHolder, int... pointValue)
+ {
+ assertEquals(pointValue.length / 2, dataHolder.size());
+
+ for (int i = 0; i < pointValue.length; i += 2)
+ {
+ int point = pointValue[i];
+ int expectedValue = pointValue[i + 1];
+ assertEquals(expectedValue, dataHolder.getValue(point));
+ }
+ }
+
+ /**
+ * Compare the contents of {@code spool} with the given collection of key-value pairs in {@code pairs}.
+ */
+ private static void assertSpool(StreamingTombstoneHistogramBuilder.Spool spool, int... pairs)
+ {
+ assertEquals(pairs.length / 2, spool.size);
+ Map<Integer, Integer> tests = new HashMap<>();
+ for (int i = 0; i < pairs.length; i += 2)
+ tests.put(pairs[i], pairs[i + 1]);
+
+ spool.forEach((k, v) -> {
+ Integer x = tests.remove(k);
+ assertNotNull("key " + k, x);
+ assertEquals(x.intValue(), v);
+ });
+ AssertStatus.assertTrue(tests.isEmpty());
+ }
+
private Map<Integer, Integer> asMap(TombstoneHistogram histogram)
{
Map<Integer, Integer> result = new HashMap<>();
histogram.forEach(result::put);
return result;
}
+
+ private Gen<StreamingTombstoneHistogramBuilder> streamingTombstoneHistogramBuilderGen(int maxBinSize, int maxSpoolSize, int maxRoundSeconds)
+ {
+ return positiveIntegerUpTo(maxBinSize).zip(integers().between(0, maxSpoolSize),
+ positiveIntegerUpTo(maxRoundSeconds),
+ StreamingTombstoneHistogramBuilder::new);
+ }
+
+ private Gen<Integer> positiveIntegerUpTo(int upperBound)
+ {
+ return integers().between(1, upperBound);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org