You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by le...@apache.org on 2020/08/25 18:44:24 UTC
[incubator-datasketches-java] 05/05: Initial implementation of
RelativeErrorQuantiles Sketch
This is an automated email from the ASF dual-hosted git repository.
leerho pushed a commit to branch RelativeErrorQuantiles
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-java.git
commit 2092f2590056add0fb6d49f330a32e50be75c828
Author: Lee Rhodes <le...@users.noreply.github.com>
AuthorDate: Tue Aug 25 11:43:33 2020 -0700
Initial implementation of RelativeErrorQuantiles Sketch
---
.../apache/datasketches/kll/KllFloatsSketch.java | 15 +-
.../java/org/apache/datasketches/req/Buffer.java | 136 ++++++-------
.../apache/datasketches/req/RelativeCompactor.java | 215 ++++++++++++++-------
...rrorSketch.java => RelativeErrorQuantiles.java} | 153 ++++++++++-----
.../req/RelativeErrorSketchIterator.java | 10 +-
.../org/apache/datasketches/req/BufferTest.java | 40 ++--
...rSketchTest.java => RelativeCompactorTest.java} | 12 +-
.../datasketches/req/RelativeErrorSketchTest.java | 32 ++-
8 files changed, 367 insertions(+), 246 deletions(-)
diff --git a/src/main/java/org/apache/datasketches/kll/KllFloatsSketch.java b/src/main/java/org/apache/datasketches/kll/KllFloatsSketch.java
index e611233..f29fda4 100644
--- a/src/main/java/org/apache/datasketches/kll/KllFloatsSketch.java
+++ b/src/main/java/org/apache/datasketches/kll/KllFloatsSketch.java
@@ -247,8 +247,8 @@ public class KllFloatsSketch {
// Specific to the floats sketch
private float[] items_; // the continuous array of float items
- private float minValue_;
- private float maxValue_;
+ private float minValue_ = Float.MAX_VALUE;
+ private float maxValue_ = Float.MIN_VALUE;
/**
* Heap constructor with the default <em>k = 200</em>, which has a rank error of about 1.65%.
@@ -888,14 +888,9 @@ public class KllFloatsSketch {
* @param value an item from a stream of items. NaNs are ignored.
*/
public void update(final float value) {
- if (Float.isNaN(value)) { return; }
- if (isEmpty()) {
- minValue_ = value;
- maxValue_ = value;
- } else {
- if (value < minValue_) { minValue_ = value; }
- if (value > maxValue_) { maxValue_ = value; }
- }
+ if (!Float.isFinite(value)) { return; }
+ minValue_ = (value < minValue_) ? value : minValue_;
+ maxValue_ = (value > maxValue_) ? value : maxValue_;
if (levels_[0] == 0) {
compressWhileUpdating();
}
diff --git a/src/main/java/org/apache/datasketches/req/Buffer.java b/src/main/java/org/apache/datasketches/req/Buffer.java
index 52bc28e..1887946 100755
--- a/src/main/java/org/apache/datasketches/req/Buffer.java
+++ b/src/main/java/org/apache/datasketches/req/Buffer.java
@@ -5,8 +5,6 @@
package org.apache.datasketches.req;
-import static java.lang.Math.max;
-
import java.util.Arrays;
import org.apache.datasketches.SketchesArgumentException;
@@ -17,6 +15,7 @@ import org.apache.datasketches.SketchesArgumentException;
* @author Lee Rhodes
*/
class Buffer {
+ static final String LS = System.getProperty("line.separator");
private float[] arr_;
private int count_;
private int delta_;
@@ -42,7 +41,7 @@ class Buffer {
count_ = 0;
delta_ = delta;
capacity_ = capacity;
- sorted_ = false;
+ sorted_ = true;
}
/**
@@ -52,13 +51,14 @@ class Buffer {
Buffer(final Buffer buf) {
arr_ = buf.arr_.clone();
count_ = buf.count_;
+ delta_ = buf.delta_;
capacity_ = buf.capacity_;
sorted_ = buf.sorted_;
}
/**
* Appends the given item to the end of the active array and increments length().
- * This will expand the array if necessary. //TODO add delta?
+ * This will expand the array if necessary.
* @param item the given item
* @return this
*/
@@ -70,16 +70,6 @@ class Buffer {
}
/**
- * Returns the current capacity of this Buffer. The capacity is the total amount of storage
- * available.
- *
- * @return the current capacity
- */
- int capacity() {
- return capacity_;
- }
-
- /**
* Returns count of items less-than the given value.
* @param value the given value
* @return count of items less-than the given value.
@@ -100,22 +90,11 @@ class Buffer {
}
/**
- * Clears a range of the buffer
- * @param start the start of the range, inclusive
- * @param end the end of the range, exclusive
- * @return this
- */
- Buffer clear(final int start, final int end) {
- for (int i = start; i < end; i++) { arr_[i] = 0; }
- return this;
- }
-
- /**
* Ensures that the capacity of this Buffer is at least newCapacity.
* If newCapacity < capacity(), no action is taken.
* @return this
*/
- Buffer ensureCapacity(final int newCapacity) {
+ private Buffer ensureCapacity(final int newCapacity) {
if (newCapacity > capacity_) {
arr_ = Arrays.copyOf(arr_, newCapacity);
capacity_ = newCapacity;
@@ -128,36 +107,40 @@ class Buffer {
* @param space the requested space remaining
* @return this
*/
- Buffer ensureSpace(final int space) {
+ private Buffer ensureSpace(final int space) {
if ((count_ + space) > arr_.length) {
- ensureCapacity(count_ + max(space, delta_));
+ ensureCapacity(count_ + space + delta_);
}
return this;
}
/**
- * Extends the given item array starting at length(). This will expand the Buffer if necessary.
+ * Extends the given item array starting at length(). This will expand this Buffer if necessary.
+ * This buffer becomes unsorted after this operation.
* @param floatArray the given item array
* @return this Buffer
*/
Buffer extend(final float[] floatArray) {
final int len = floatArray.length;
ensureSpace(len);
- System.arraycopy(floatArray, 0, arr_, count_, len); //TODO a merge sort instead!
+ System.arraycopy(floatArray, 0, arr_, count_, len);
count_ += len;
sorted_ = false;
return this;
}
+
/**
- * Append other buffer to this buffer. Items beyond length() are ignored.
+ * Append other buffer to this buffer. Any items beyond other.length() are ignored.
+ * This will expand this Buffer if necessary.
+ * This buffer becomes unsorted after this operation.
* @param other the other buffer
* @return this
*/
Buffer extend(final Buffer other) { //may not need this
- final int len = other.length();
+ final int len = other.getItemCount();
ensureSpace(len);
- System.arraycopy(other.getArray(), 0, arr_, length(), len);
+ System.arraycopy(other.getArray(), 0, arr_, getItemCount(), len);
count_ += len;
sorted_ = false;
return this;
@@ -172,6 +155,16 @@ class Buffer {
}
/**
+ * Gets the current capacity of this Buffer. The capacity is the total amount of storage
+ * currently available without expanding the array.
+ *
+ * @return the current capacity
+ */
+ int getCapacity() {
+ return capacity_;
+ }
+
+ /**
* Returns an array of the even values from the range start (inclusive) to end (exclusive).
* The even values are with respect to the start index. If the starting index is odd with
* respect to the origin of the Buffer, then this will actually return the odd values.
@@ -190,11 +183,25 @@ class Buffer {
return out;
}
+ /**
+ * Gets an item given its index
+ * @param index the given index
+ * @return an item given its index
+ */
float getItem(final int index) {
return arr_[index];
}
/**
+ * Returns the item count.
+ *
+ * @return the number of active items currently in this array.
+ */
+ int getItemCount() {
+ return count_;
+ }
+
+ /**
* Returns an array of the odd values from the range start (inclusive) to end (exclusive).
* The odd values are with respect to the start index. If the starting index is odd with
* respect to the origin of the Buffer, then this will actually return the even values.
@@ -224,21 +231,14 @@ class Buffer {
}
/**
- * Returns the length (item count).
- *
- * @return the number of active items currently in this array.
- */
- int length() {
- return count_;
- }
-
- /**
* Merges the incoming sorted array into this sorted array.
* @param arrIn sorted array in
* @return this
*/
Buffer mergeSortIn(final float[] arrIn) {
- if (!sorted_) { throw new SketchesArgumentException("Must be sorted."); }
+ if (!sorted_) {
+ throw new SketchesArgumentException("Must be sorted.");
+ }
ensureSpace(arrIn.length);
int i = count_;
int j = arrIn.length;
@@ -254,17 +254,7 @@ class Buffer {
}
}
count_ += arrIn.length;
- setSorted(true);
- return this;
- }
-
- /**
- * Set the sorted state
- * @param sortedState the sorted state
- * @return the sorted state
- */
- Buffer setSorted(final boolean sortedState) {
- sorted_ = sortedState;
+ sorted_ = true;
return this;
}
@@ -279,36 +269,18 @@ class Buffer {
}
/**
- * Sorts this array from start to length;
- * @param start starting index
- * @param length number of items to sort
- * @return this
- */
- Buffer sort(final int start, final int length) {
- Arrays.sort(arr_, start, length);
- return this;
- }
-
- /**
- * Returns a new items array of all the active data.
- * @return a new items array with data.
- */
- float[] toItemArray() {
- return Arrays.copyOf(arr_, count_);
- }
-
- /**
- * Returns a new item array of the active data specified by the offset and length.
- *
- * @param offset the zero-based offset into the array.
- * @param length the number of items to copy to the new array.
- * @return a new item array of all the active data specified by the offset and length.
+ * Returns a printable formatted string of the values of this buffer separated by a single space.
+ * @param decimals The desired precision after the decimal point
+ * @return a printable, formatted string of the values of this buffer.
*/
- float[] toItemArray(final int offset, final int length) {
- if ((offset + length) > count_) {
- throw new SketchesArgumentException("Sum of arguments exceed current length().");
+ String toHorizList(final int decimals) {
+ final StringBuilder sb = new StringBuilder();
+ final String fmt = " %." + decimals + "f";
+ for (int i = 0; i < count_; i++) {
+ final String str = String.format(fmt, arr_[i]);
+ sb.append(str);
}
- return Arrays.copyOfRange(arr_, offset, offset + length);
+ return sb.toString();
}
/**
diff --git a/src/main/java/org/apache/datasketches/req/RelativeCompactor.java b/src/main/java/org/apache/datasketches/req/RelativeCompactor.java
index 8e97a0e..b2d5a46 100644
--- a/src/main/java/org/apache/datasketches/req/RelativeCompactor.java
+++ b/src/main/java/org/apache/datasketches/req/RelativeCompactor.java
@@ -19,12 +19,14 @@
package org.apache.datasketches.req;
-import static java.lang.Math.max;
+import static java.lang.Math.min;
import static java.lang.Math.round;
import static org.apache.datasketches.Util.numberOfTrailingOnes;
-import static org.apache.datasketches.req.RelativeErrorSketch.INIT_NUMBER_OF_SECTIONS;
-import static org.apache.datasketches.req.RelativeErrorSketch.MIN_K;
-import static org.apache.datasketches.req.RelativeErrorSketch.println;
+import static org.apache.datasketches.req.Buffer.LS;
+import static org.apache.datasketches.req.RelativeErrorQuantiles.INIT_NUMBER_OF_SECTIONS;
+import static org.apache.datasketches.req.RelativeErrorQuantiles.MIN_K;
+import static org.apache.datasketches.req.RelativeErrorQuantiles.print;
+import static org.apache.datasketches.req.RelativeErrorQuantiles.println;
import java.util.Random;
@@ -35,17 +37,20 @@ import java.util.Random;
public class RelativeCompactor {
private static final double SQRT2 = Math.sqrt(2.0);
private int numCompactions; //number of compaction operations performed
- private int state; //state of the deterministic compaction schedule
+
+ //State of the deterministic compaction schedule.
+ // If there are no merge operations performed, state == numCompactions
+ private int state;
//if there are no merge operations performed, state == numCompactions
private boolean coin; //true or false uniformly at random for each compaction
- private int sectionSize; //k
- private int numSections; //# of sections in the buffer, minimum 3
- Buffer buf;
- int lgWeight = 0;
+ private int sectionSize; //initialized with k
+ private double sectionSizeDbl;
+ private int numSections; //# of sections, minimum 3
+ private Buffer buf;
+ private final int lgWeight;
private boolean debug;
- Random rand = new Random();
-
+ private Random rand;
/**
* Constructor
@@ -53,8 +58,12 @@ public class RelativeCompactor {
* @param lgWeight this compactor's lgWeight
* @param debug optional
*/
- public RelativeCompactor(final int sectionSize, final int lgWeight, final boolean debug) {
+ RelativeCompactor(
+ final int sectionSize,
+ final int lgWeight,
+ final boolean debug) {
this.sectionSize = sectionSize;
+ sectionSizeDbl = sectionSize;
this.lgWeight = lgWeight;
this.debug = debug;
@@ -62,16 +71,25 @@ public class RelativeCompactor {
state = 0;
coin = false;
numSections = INIT_NUMBER_OF_SECTIONS;
- final int cap = 2 * numSections * sectionSize; //cap is always even
- buf = new Buffer(cap, cap / 4);
+ final int nCap = 2 * numSections * sectionSize; //nCap is always even
+ buf = new Buffer(nCap, nCap);
+ if (debug) { rand = new Random(1); }
+ else { rand = new Random(); }
+
+ if (debug) {
+ println(" New Compactor: height: " + lgWeight
+ + "\tsectionSize: " + sectionSize
+ + "\tnumSections: " + numSections + LS);
+ }
}
/**
* Copy Constuctor
* @param other the compactor to be copied into this one
*/
- public RelativeCompactor(final RelativeCompactor other) {
+ RelativeCompactor(final RelativeCompactor other) {
sectionSize = other.sectionSize;
+ sectionSizeDbl = other.sectionSizeDbl;
lgWeight = other.lgWeight;
debug = other.debug;
numCompactions = other.numCompactions;
@@ -79,6 +97,8 @@ public class RelativeCompactor {
coin = other.coin;
numSections = other.numSections;
buf = new Buffer(other.buf);
+ if (debug) { rand = new Random(1); }
+ else { rand = new Random(); }
}
/**
@@ -86,7 +106,7 @@ public class RelativeCompactor {
* @param item the given item
* @return this;
*/
- public RelativeCompactor append(final float item) {
+ RelativeCompactor append(final float item) {
buf.append(item);
return this;
}
@@ -95,58 +115,77 @@ public class RelativeCompactor {
* Perform a compaction operation on this compactor
* @return the array of items to be promoted to the next level compactor
*/
- public float[] compact() {
- if (debug) { println("Compactor " + lgWeight + " Compacting ..."); }
- final int cap = capacity(); //ensures and gets
+ float[] compact() {
+ if (debug) {
+ println(" Compacting[" + lgWeight + "] nomCapacity: " + getNomCapacity()
+ + "\tsectionSize: " + sectionSize
+ + "\tnumSections: " + numSections
+ + "\tstate(bin): " + Integer.toBinaryString(state));
+ }
+
if (!buf.isSorted()) {
buf.sort(); //Footnote 1
}
- //choose a part of the buffer to compac
- final int compactionOffset;
- if (sectionSize < MIN_K) { //COMMENT: can be avoided by making sure sectionSize >= MIN_K
- //too small sections => compact half of the buffer always
- compactionOffset = cap / 2; //COMMENT: Not sure this makes sense and may be unneccesary
- }
- else { //Footnote 2
- final int secsToCompact = numberOfTrailingOnes(state) + 1;
- compactionOffset = (cap / 2) + ((numSections - secsToCompact) * sectionSize);
-
- if (numCompactions >= (1 << (numSections - 1))) { //make numSections larger
- numSections *= 2; //Footnote 3
- sectionSize = max(nearestEven(sectionSize / SQRT2), MIN_K); //Footnote 4
- }
- }
- //COMMENT: we can avoid this if we can guarantee that buf.length, compactionSize are even
- //if (((buf.length() - compactionOffset) % 2) == 1) { //ensure compacted part has an even size
- // if (compactionOffset > 0) { compactionOffset--; }
- //} else { compactionOffset++; }
- assert compactionOffset <= (buf.length() - 2); //Footnote 5; This is easier to read!
+ if (debug) { print(" "); print(toHorizontalList(0)); }
+
+ //choose a part of the buffer to compact
+ final int compactionOffset; //a.k.a. "s" see footnote 2
+ final int secsToCompact = min(numberOfTrailingOnes(state) + 1, numSections - 1);
+ compactionOffset = buf.getItemCount() - (secsToCompact * sectionSize);
+
+ adjustSectSizeNumSect(); //see Footnotes 3, 4 and 8
+
+ assert compactionOffset <= (buf.getItemCount() - 2); //Footnote 5; This is easier to read!
if ((numCompactions % 2) == 1) { coin = !coin; } //invert coin; Footnote 6
- else { coin = (rand.nextDouble() < 0.5); } //random coin flip
+ else { coin = (rand.nextDouble() < 0.5); } //random coin flip
final float[] promote = (coin)
- ? buf.getEvens(compactionOffset, buf.length())
- : buf.getOdds(compactionOffset, buf.length());
+ ? buf.getOdds(compactionOffset, buf.getItemCount())
+ : buf.getEvens(compactionOffset, buf.getItemCount());
- //if (debug) { println("RelativeCompactor: Compacting..."); } //Footnote 7
+ if (debug) { //Footnote 7
+ println(" s: " + compactionOffset
+ + "\tsecsToComp: " + secsToCompact
+ + "\tsectionSize: " + sectionSize
+ + "\tnumSections: " + numSections);
+ final int delete = buf.getItemCount() - compactionOffset;
+ final int promoteLen = promote.length;
+ final int offset = (coin) ? 1 : 0;
+ println(" Promote: " + promoteLen + "\tDelete: " + delete + "\tOffset: " + offset);
+ }
buf.trimLength(compactionOffset);
numCompactions += 1;
state += 1;
- if (debug) { println("Compactor: Done\n Buf Length :\t" + buf.length()); }
+ if (debug) {
+ println(" DONE: nomCapacity: " + getNomCapacity()
+ + "\tnumCompactions: " + numCompactions);
+ }
return promote;
} //End Compact
/**
- * Sets the current maximum capacity of this compactor.
+ * Sets the current nominal capacity of this compactor.
* @return the current maximum capacity of this compactor.
*/
- public int capacity() {
- buf.ensureCapacity(2 * numSections * sectionSize);
- return buf.capacity();
+ int getNomCapacity() {
+ final int nCap = 2 * numSections * sectionSize;
+ return nCap;
+ }
+
+ /**
+ * Extends the given item array starting at length() by merging the items into
+ * the already sorted array.
+ * This will expand the Buffer if necessary.
+ * @param items the given item array, which also must be sorted
+ * @return this
+ */
+ RelativeCompactor extendAndMerge(final float[] items) {
+ buf.mergeSortIn(items);
+ return this;
}
/**
@@ -154,7 +193,7 @@ public class RelativeCompactor {
* @param items the given items
* @return this.
*/
- public RelativeCompactor extend(final float[] items) {
+ RelativeCompactor extend(final float[] items) {
buf.extend(items);
return this;
}
@@ -163,58 +202,80 @@ public class RelativeCompactor {
* Gets a reference to this compactor's internal Buffer
* @return a reference to this compactor's internal Buffer
*/
- Buffer getBuf() { return buf; }
+ Buffer getBuffer() { return buf; }
/**
* Gets the current capacity of this compactor
* @return the current capacity of this compactor
*/
- public int getCapacity() {
- return buf.capacity();
+ int getCapacity() {
+ return buf.getCapacity();
}
/**
* Gets the lgWeight of this buffer
* @return the lgWeight of this buffer
*/
- public int getLgWeight() {
+ int getLgWeight() {
return lgWeight;
}
/**
- * Gets the length (number of retained values) in this compactor.
- * @return the length of this compactor
+ * Gets the number of retained values in this compactor.
+ * @return the number of retained values in this compactor.
*/
- public int length() { return buf.length(); }
+ int getNumRetainedEntries() { return buf.getItemCount(); }
/**
* Merge the other given compactor into this one
* @param other the other given compactor
+ * @param mergeSort if true, apply mergeSort algorithm instead of sort().
* @return this
*/
- public RelativeCompactor mergeIntoSelf(final RelativeCompactor other) {
+ RelativeCompactor merge(final RelativeCompactor other, final boolean mergeSort) {
state |= other.state;
numCompactions += other.numCompactions;
- buf.extend(other.getBuf());
- buf.sort(); //TODO this wasn't in Pavel's code
+ if (mergeSort) { //assumes this and other is already sorted
+ final float[] arrIn = other.getBuffer().getArray();
+ buf.mergeSortIn(arrIn);
+ } else {
+ buf.extend(other.getBuffer());
+ buf.sort();
+ }
return this;
}
/**
- * Sort only the values in this compactor that are not already sorted.
- * @return this
+ * Returns the nearest even integer to the given value.
+ * @param value the given value
+ * @return the nearest even integer to the given value.
*/
- public RelativeCompactor optimizedSort() { //TODO not done
- return this;
+ //also used by test
+ static final int nearestEven(final double value) {
+ return ((int) round(value / 2.0)) << 1;
}
/**
- * Gets the non-normalized rank of the given value. This is equal to the number of values in
+ * Returns a printable formatted string of the values of this buffer separated by a single space.
+ * This string is prepended by the lgWeight and retained entries of this compactor.
+ * @param decimals The desired precision after the decimal point
+ * @return a printable, formatted string of the values of this buffer.
+ */
+ String toHorizontalList(final int decimals) {
+ final int re = getNumRetainedEntries();
+ final int h = getLgWeight();
+ final String str = h + " [" + re + "]: " + buf.toHorizList(decimals) + LS;
+ return str;
+ }
+
+ /**
+ * Gets the non-normalized rank of the given value.
+ * This is equal to the number of values in
* this compactor that are < the given value.
* @param value the given value
* @return the non-normalized rank of the given value
*/
- public int rank(final float value) { //one-based integer
+ int rank(final float value) { //one-based integer
return buf.countLessThan(value);
}
@@ -222,18 +283,22 @@ public class RelativeCompactor {
* Sort all values in this compactor.
* @return this
*/
- public RelativeCompactor sort() {
+ RelativeCompactor sort() {
if (!buf.isSorted()) { buf.sort(); }
return this;
}
- @Override
- public String toString() {
- return null;
- }
-
- private static final int nearestEven(final double value) {
- return ((int) round(value / 2.0)) << 1;
+ /**
+ * This adjusts sectionSize and numSections and guarantees that the sectionSize
+ * will always be even and >= minK.
+ */
+ private void adjustSectSizeNumSect() {
+ final double newSectSizeDbl = sectionSizeDbl / SQRT2;
+ final int nearestEven = nearestEven(newSectSizeDbl);
+ if (nearestEven < MIN_K) { return; }
+ sectionSizeDbl = newSectSizeDbl;
+ sectionSize = nearestEven;
+ numSections <<= 1;
}
/* Footnotes:
@@ -260,5 +325,9 @@ public class RelativeCompactor {
*
* 7. Possible debug outputs: compactionOffset, numCompactions, sectionsToCompact, length,
* capacity, sectionSize, numSections
+ *
+ * 8. if (((buf.length() - compactionOffset) % 2) == 1) { //ensure compacted part has an even size
+ * if (compactionOffset > 0) { compactionOffset--; }
+ * } else { compactionOffset++; }
*/
}
diff --git a/src/main/java/org/apache/datasketches/req/RelativeErrorSketch.java b/src/main/java/org/apache/datasketches/req/RelativeErrorQuantiles.java
similarity index 57%
rename from src/main/java/org/apache/datasketches/req/RelativeErrorSketch.java
rename to src/main/java/org/apache/datasketches/req/RelativeErrorQuantiles.java
index 0ee325d..8186964 100644
--- a/src/main/java/org/apache/datasketches/req/RelativeErrorSketch.java
+++ b/src/main/java/org/apache/datasketches/req/RelativeErrorQuantiles.java
@@ -20,28 +20,28 @@
package org.apache.datasketches.req;
import static java.lang.Math.max;
+import static org.apache.datasketches.req.Buffer.LS;
import java.util.ArrayList;
import java.util.List;
/**
- * Proof-of-concept code for paper "Relative Error Streaming Quantiles",
+ * Java implementation for paper "Relative Error Streaming Quantiles",
* https://arxiv.org/abs/2004.01668.
*
* <p>This implementation differs from the algorithm described in the paper in the following:</p>
* <ul><li>The algorithm requires no upper bound on the stream length.
- * Instead, each relative-compactor (i.e. buffer) counts the number of compaction operations performed
- * so far (variable numCompactions). Initially, the relative-compactor starts with 2 buffer sections
+ * Instead, each relative-compactor counts the number of compaction operations performed
+ * so far (variable numCompactions). Initially, the relative-compactor starts with 3 sections
* and each time the numCompactions exceeds 2^{# of sections}, we double the number of sections
* (variable numSections).
* </li>
- * <li>The size of each buffer section (variable k and sectionSize in the code and parameter k in
+ * <li>The size of each section (variable k and sectionSize in the code and parameter k in
* the paper) is initialized with a value set by the user via variable k.
- * When the number of sections doubles, we decrease sectionSize by a factor of sqrt(2)
- * (for which we use a float variable sectionSizeF). As above, this is applied
- * at each level separately. Thus, when we double the number of sections, the buffer size
- * increases by a factor of sqrt(2) (up to +-1 after rounding).</li>
+ * When the number of sections doubles, we decrease sectionSize by a factor of sqrt(2).
+ * This is applied at each level separately. Thus, when we double the number of sections, the
+ * size increases by a factor of sqrt(2) (up to +-1 after rounding).</li>
* <li>The merge operation here does not perform "special compactions", which are used in the paper
* to allow for a tight analysis of the sketch.</li>
* </ul>
@@ -51,12 +51,12 @@ import java.util.List;
* @author Lee Rhodes
*/
@SuppressWarnings("unused")
-public class RelativeErrorSketch {
+public class RelativeErrorQuantiles {
//An initial upper bound on log_2 (number of compactions) + 1 COMMMENT: Huh?
final static int INIT_NUMBER_OF_SECTIONS = 3;
final static int MIN_K = 4;
- //should be even; value of 50 roughly corresponds to 0.01-relative error guarantee wiTH
- //constant probability (TODO determine confidence bounds)
+ //should be even; value of 50 roughly corresponds to 0.01-relative error guarantee with
+ //constant probability
final static int DEFAULT_K = 50;
private int k; //default
@@ -64,14 +64,16 @@ public class RelativeErrorSketch {
List<RelativeCompactor> compactors = new ArrayList<>();
//int levels; //number of compactors; was H
int size; //retained items
- private int maxSize; //capacity
+ private int maxNomSize; //nominal capacity
private long totalN; //total items offered to sketch
+ private float minValue = Float.MAX_VALUE;
+ private float maxValue = Float.MIN_VALUE;
/**
* Constructor with default k = 50;
*
*/
- RelativeErrorSketch() {
+ public RelativeErrorQuantiles() {
this(DEFAULT_K, false);
}
@@ -79,7 +81,7 @@ public class RelativeErrorSketch {
* Constructor
* @param k Controls the size and error of the sketch
*/
- RelativeErrorSketch(final int k) {
+ public RelativeErrorQuantiles(final int k) {
this(k, false);
}
@@ -89,12 +91,13 @@ public class RelativeErrorSketch {
* rounded down by one.
* @param debug debug mode
*/
- RelativeErrorSketch(final int k, final boolean debug) {
+ public RelativeErrorQuantiles(final int k, final boolean debug) {
this.k = max(k & -2, MIN_K);
this.debug = debug;
size = 0;
- maxSize = 0;
+ maxNomSize = 0;
totalN = 0;
+ if (debug) { println("START:"); }
grow();
}
@@ -102,54 +105,73 @@ public class RelativeErrorSketch {
* Copy Constructor
* @param other the other sketch to be deep copied into this one.
*/
- RelativeErrorSketch(final RelativeErrorSketch other) {
+ RelativeErrorQuantiles(final RelativeErrorQuantiles other) {
k = other.k;
debug = other.debug;
for (int i = 0; i < other.levels(); i++) {
compactors.add(new RelativeCompactor(other.compactors.get(i)));
}
size = other.size;
- maxSize = other.maxSize;
+ maxNomSize = other.maxNomSize;
totalN = other.totalN;
}
void compress(final boolean lazy) {
- if (debug) { println("Compression Start ..."); }
- updateMaxSize();
- if (size < maxSize) { return; }
- for (int h = 0; h < compactors.size(); h++) { //# compactors
+ updateMaxNomSize();
+ if (debug) {
+ println("COMPRESS: sKsize: " + size + "\t>=\t"
+ + "\tmaxNomSize: " + maxNomSize
+ + "\tN: " + totalN);
+ }
+
+ if (size < maxNomSize) { return; }
+ for (int h = 0; h < compactors.size(); h++) {
final RelativeCompactor c = compactors.get(h);
- if (c.length() >= c.capacity()) {
- if ((h + 1) >= levels()) { grow(); } //add a level
- final float[] arr = c.compact();
- compactors.get(h + 1).extend(arr);
- size += arr.length;
- if (lazy && (size < maxSize)) { break; }
+ final int re = c.getNumRetainedEntries();
+ final int nc = c.getNomCapacity();
+
+ if (re >= nc) {
+ if ((h + 1) >= levels()) {
+ if (debug) {
+ println(" Must Add Compactor: len(c[" + h + "]): "
+ + re + "\t>=\tc[" + h + "].nomCapacity(): " + nc);
+ }
+ grow(); //add a level
+ }
+ final float[] promoted = c.compact();
+ compactors.get(h + 1).extendAndMerge(promoted);
+ updateRetainedItems();
+ if (lazy && (size < maxNomSize)) { break; }
}
}
if (debug) {
- println("Compresssion Done:\n RetainedItems:\t" + size + "\n Capacity :\t" + maxSize);
+ for (int h = 0; h < compactors.size(); h++) {
+ final RelativeCompactor c = compactors.get(h);
+ print(c.toHorizontalList(0));
+ }
+ println("COMPRESS: DONE: sKsize: " + size
+ + "\tMaxNomSize: " + maxNomSize + LS);
}
}
- double[] getCDF(final float[] splitPoints) {
- return getPmfOrCdf(splitPoints, true);
- }
+ // public double[] getCDF(final float[] splitPoints) {
+ // return getPmfOrCdf(splitPoints, true);
+ // }
@SuppressWarnings("static-method")
private double[] getPmfOrCdf(final float[] splitPoints, final boolean isCdf) {
return null;
}
- public double getQuantile(final double rank) {
-
- return 0;
- }
+ // public double getQuantile(final double rank) {
+ //
+ // return 0;
+ // }
void grow() {
compactors.add( new RelativeCompactor(k, levels(), debug));
- updateMaxSize();
+ updateMaxNomSize();
}
/**
@@ -184,18 +206,20 @@ public class RelativeErrorSketch {
* Merge other sketch into this one. The other sketch is not modified.
* @param other sketch to be merged into this one.
*/
- RelativeErrorSketch mergeIntoSelf(final RelativeErrorSketch other) {
+ RelativeErrorQuantiles merge(final RelativeErrorQuantiles other) {
//Grow until self has at least as many compactors as other
while (levels() < other.levels()) { grow(); }
//Append the items in same height compactors
for (int i = 0; i < levels(); i++) {
- compactors.get(i).mergeIntoSelf(other.compactors.get(i));
+ final boolean mergeSort = i > 0;
+ compactors.get(i).merge(other.compactors.get(i), mergeSort);
}
updateRetainedItems();
- // After merging, we should not be lazy when compressing the sketch (as the maxSize bound may
+ // After merging, we should not be lazy when compressing the sketch (as the maxNomSize bound may
// be exceeded on many levels)
- if (size >= maxSize) { compress(false); }
- assert size < maxSize;
+ if (size >= maxNomSize) { compress(false); }
+ updateRetainedItems();
+ assert size < maxNomSize;
return this;
}
@@ -214,7 +238,7 @@ public class RelativeErrorSketch {
int nnRank = 0;
for (int i = 0; i < levels(); i++) {
final RelativeCompactor c = compactors.get(i);
- nnRank += c.rank(value) * (1 << c.lgWeight);
+ nnRank += c.rank(value) * (1 << c.getLgWeight());
}
return (double)nnRank / totalN;
}
@@ -223,21 +247,52 @@ public class RelativeErrorSketch {
return null;
}
+ /**
+ * Returns a summary of the sketch and the horizontal lists for all compactors.
+ * @param decimals number of digits after the decimal point
+ * @return a summary of the sketch and the horizontal lists for all compactors.
+ */
+ public String getSummary(final int decimals) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("**********Relative Error Quantiles Sketch Summary**********").append(LS);
+ final int numC = compactors.size();
+ sb.append(" N : " + totalN).append(LS);
+ sb.append(" Retained Entries: " + size).append(LS);
+ sb.append(" Max Nominal Size: " + maxNomSize).append(LS);
+ sb.append(" Min Value : " + minValue).append(LS);
+ sb.append(" Max Value : " + maxValue).append(LS);
+
+ sb.append(" Levels : " + compactors.size()).append(LS);
+ for (int i = 0; i < numC; i++) {
+ final RelativeCompactor c = compactors.get(i);
+ sb.append(" " + c.toHorizontalList(decimals));
+ }
+ sb.append("************************End Summary************************").append(LS);
+ return sb.toString();
+ }
+
void update(final float item) {
+ if (!Float.isFinite(item)) { return; }
+
final RelativeCompactor c = compactors.get(0).append(item);
size++;
- if (size >= maxSize) { compress(true); }
totalN++;
+ minValue = (item < minValue) ? item : minValue;
+ maxValue = (item > maxValue) ? item : maxValue;
+ if (size >= maxNomSize) {
+ c.sort();
+ compress(true);
+ }
}
/**
* Computes a new bound for determining when to compress the sketch.
* @return this
*/
-RelativeErrorSketch updateMaxSize() {
+RelativeErrorQuantiles updateMaxNomSize() {
int cap = 0;
- for (RelativeCompactor c : compactors) { cap += c.capacity(); } //get or set?
- maxSize = cap;
+ for (RelativeCompactor c : compactors) { cap += c.getNomCapacity(); }
+ maxNomSize = cap;
return this;
}
@@ -245,9 +300,9 @@ RelativeErrorSketch updateMaxSize() {
* Computes the size for the sketch.
* @return this
*/
-RelativeErrorSketch updateRetainedItems() {
+RelativeErrorQuantiles updateRetainedItems() {
int count = 0;
- for (RelativeCompactor c : compactors) { count += c.length(); }
+ for (RelativeCompactor c : compactors) { count += c.getNumRetainedEntries(); }
size = count;
return this;
}
diff --git a/src/main/java/org/apache/datasketches/req/RelativeErrorSketchIterator.java b/src/main/java/org/apache/datasketches/req/RelativeErrorSketchIterator.java
index 5e1459b..0a05f7d 100644
--- a/src/main/java/org/apache/datasketches/req/RelativeErrorSketchIterator.java
+++ b/src/main/java/org/apache/datasketches/req/RelativeErrorSketchIterator.java
@@ -33,10 +33,10 @@ public class RelativeErrorSketchIterator {
private int retainedItems;
private Buffer currentBuf;
- RelativeErrorSketchIterator(final RelativeErrorSketch sketch) {
+ RelativeErrorSketchIterator(final RelativeErrorQuantiles sketch) {
compactors = sketch.compactors;
retainedItems = sketch.size;
- currentBuf = compactors.get(0).buf;
+ currentBuf = compactors.get(0).getBuffer();
cIndex = 0;
bIndex = -1;
}
@@ -49,12 +49,12 @@ public class RelativeErrorSketchIterator {
*/
public boolean next() {
if ((retainedItems == 0)
- || ((cIndex == (compactors.size() - 1)) && (bIndex == currentBuf.length()))) {
+ || ((cIndex == (compactors.size() - 1)) && (bIndex == currentBuf.getItemCount()))) {
return false;
}
- if (bIndex == currentBuf.length()) {
+ if (bIndex == currentBuf.getItemCount()) {
cIndex++;
- currentBuf = compactors.get(cIndex).buf;
+ currentBuf = compactors.get(cIndex).getBuffer();
bIndex = 0;
} else {
bIndex++;
diff --git a/src/test/java/org/apache/datasketches/req/BufferTest.java b/src/test/java/org/apache/datasketches/req/BufferTest.java
index afbd64c..ee2ea03 100644
--- a/src/test/java/org/apache/datasketches/req/BufferTest.java
+++ b/src/test/java/org/apache/datasketches/req/BufferTest.java
@@ -33,16 +33,16 @@ public class BufferTest {
public void checkTrimLength() {
Buffer buf = new Buffer(16, 4);
for (int i = 0; i < 8; i++) { buf.append(i+1); }
- assertEquals(buf.length(), 8);
+ assertEquals(buf.getItemCount(), 8);
buf.trimLength(4);
- assertEquals(buf.length(), 4);
+ assertEquals(buf.getItemCount(), 4);
}
@Test
public void checkGetOdds() {
int cap = 16;
Buffer buf = new Buffer(cap, cap / 4);
- for (int i = 0; i < buf.capacity(); i++) {
+ for (int i = 0; i < buf.getCapacity(); i++) {
buf.append(i);
}
float[] out = buf.getOdds(0, cap);
@@ -56,10 +56,10 @@ public class BufferTest {
public void checkGetEvens() {
int cap = 15;
Buffer buf = new Buffer(cap, cap / 4);
- for (int i = 0; i < buf.capacity(); i++) {
+ for (int i = 0; i < buf.getCapacity(); i++) {
buf.append(i);
}
- float[] out = buf.getEvens(0, buf.capacity());
+ float[] out = buf.getEvens(0, buf.getCapacity());
println("");
for (int i = 0; i < out.length; i++) {
print((int)out[i] + " ");
@@ -70,25 +70,29 @@ public class BufferTest {
public void checkAppend() {
Buffer buf = new Buffer(2, 2);
buf.append(1);
- assertEquals(buf.length(), 1);
+ assertEquals(buf.getItemCount(), 1);
buf.append(2);
- assertEquals(buf.length(), 2);
+ assertEquals(buf.getItemCount(), 2);
buf.append(3);
- assertEquals(buf.capacity(), 4);
+ assertEquals(buf.getCapacity(), 4);
}
@Test
public void checkCountLessThan() {
Buffer buf = new Buffer(16, 2);
- buf.extend(new float[] {1,2,3,4,5,6,7,1});
- buf.setSorted(true);
+ float[] unsortedArr = {1,7,3,6,5,2,4};
+ buf.extend(unsortedArr); //unsorted flag
assertEquals(buf.countLessThan(4), 3);
- buf.setSorted(false);
- assertEquals(buf.countLessThan(4), 4);
- buf.clear(4, 7);
- assertEquals(buf.getItem(4), 0.0F);
- assertEquals(buf.getItem(5), 0.0F);
- assertEquals(buf.getItem(6), 0.0F);
+ buf = new Buffer(16, 2);
+ float[] sortedArr = {1,2,3,4,5,6,7};
+ buf.mergeSortIn(sortedArr);
+ assertEquals(buf.countLessThan(4), 3);
+ buf.mergeSortIn(sortedArr);
+ assertEquals(buf.countLessThan(4), 6);
+ buf.trimLength(12);
+ assertEquals(buf.getItemCount(), 12);
+ assertEquals(buf.getItem(12), 0.0F);
+ assertEquals(buf.getItem(13), 0.0F);
}
@Test
@@ -98,7 +102,7 @@ public class BufferTest {
float[] arr2 = {3,4};
buf.extend(arr1);
buf.extend(arr2);
- for (int i = 0; i < buf.length(); i++) {
+ for (int i = 0; i < buf.getItemCount(); i++) {
println(buf.getItem(i));
}
}
@@ -135,7 +139,7 @@ public class BufferTest {
buf.extend(arr1);
buf.sort();
buf.mergeSortIn(arr2);
- int len = buf.length();
+ int len = buf.getItemCount();
for (int i = 0; i < len; i++) { print(buf.getItem(i) + ", "); }
println("");
}
diff --git a/src/test/java/org/apache/datasketches/req/RelativeErrorSketchTest.java b/src/test/java/org/apache/datasketches/req/RelativeCompactorTest.java
similarity index 82%
copy from src/test/java/org/apache/datasketches/req/RelativeErrorSketchTest.java
copy to src/test/java/org/apache/datasketches/req/RelativeCompactorTest.java
index 5989e79..cd58730 100644
--- a/src/test/java/org/apache/datasketches/req/RelativeErrorSketchTest.java
+++ b/src/test/java/org/apache/datasketches/req/RelativeCompactorTest.java
@@ -19,21 +19,19 @@
package org.apache.datasketches.req;
+import static org.testng.Assert.assertEquals;
+
import org.testng.annotations.Test;
/**
* @author Lee Rhodes
*/
@SuppressWarnings("javadoc")
-public class RelativeErrorSketchTest {
+public class RelativeCompactorTest {
@Test
- public void test1() {
- RelativeErrorSketch sk = new RelativeErrorSketch(4, true); //w debug
- for (int i = 1; i < 100; i++) {
- sk.update(i);
- }
+ public void checkNearestEven() {
+ assertEquals(RelativeCompactor.nearestEven(-0.9), 0);
}
-
}
diff --git a/src/test/java/org/apache/datasketches/req/RelativeErrorSketchTest.java b/src/test/java/org/apache/datasketches/req/RelativeErrorSketchTest.java
index 5989e79..2c8afb4 100644
--- a/src/test/java/org/apache/datasketches/req/RelativeErrorSketchTest.java
+++ b/src/test/java/org/apache/datasketches/req/RelativeErrorSketchTest.java
@@ -30,10 +30,38 @@ public class RelativeErrorSketchTest {
@Test
public void test1() {
- RelativeErrorSketch sk = new RelativeErrorSketch(4, true); //w debug
- for (int i = 1; i < 100; i++) {
+ RelativeErrorQuantiles sk = new RelativeErrorQuantiles(4, true); //w debug
+ for (int i = 101; i-- > 1; ) {
sk.update(i);
}
+ print(sk.getSummary(0));
+
+ for (float i = 10; i <= 100; i += 10) {
+ printRank(sk, i + .5f);
+ }
+ }
+
+ private static void printRank(RelativeErrorQuantiles sk, float v) {
+ double r = sk.rank(v);
+ println("Normalized Rank: value: " + v + ", rank: " + r);
+ }
+
+ @Test
+ public void strTest() {
+ StringBuilder sb = new StringBuilder();
+ float[] arr = {1, 2, 3};
+ String fmt = " %.0f";
+ for (int i = 0; i < arr.length; i++) {
+ String str = String.format(fmt, arr[i]);
+ sb.append(str);
+ }
+ println(sb.toString());
}
+
+ static final void print(final Object o) { System.out.print(o.toString()); }
+
+ static final void println(final Object o) { System.out.println(o.toString()); }
+
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org