You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by le...@apache.org on 2020/07/07 23:37:09 UTC
[incubator-datasketches-java] 06/10: Interm: Completed major
rewrite of Concurrent Theta Sketch unit tests and cleaned up main code as
well.
This is an automated email from the ASF dual-hosted git repository.
leerho pushed a commit to branch Refactor_Theta_Tuple
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-java.git
commit 8736b03d8b2ae49c3f2b1e26fa6c89127d39c2c4
Author: Lee Rhodes <le...@users.noreply.github.com>
AuthorDate: Wed Jul 1 23:00:22 2020 -0700
Interm: Completed major rewrite of Concurrent Theta Sketch unit tests
and cleaned up main code as well.
---
.../org/apache/datasketches/theta/AnotBimpl.java | 6 +-
.../theta/ConcurrentHeapThetaBuffer.java | 95 +-
.../theta/ConcurrentSharedThetaSketch.java | 15 +-
.../datasketches/theta/DirectCompactSketch.java | 9 +-
.../theta/DirectQuickSelectSketch.java | 2 +-
.../theta/DirectQuickSelectSketchR.java | 8 +-
.../datasketches/theta/EmptyCompactSketch.java | 7 +-
.../apache/datasketches/theta/HeapAlphaSketch.java | 8 +-
.../datasketches/theta/HeapCompactSketch.java | 11 +-
.../datasketches/theta/HeapQuickSelectSketch.java | 8 +-
.../datasketches/theta/HeapUpdateSketch.java | 12 +-
.../datasketches/theta/JaccardSimilarity.java | 12 +-
.../datasketches/theta/SingleItemSketch.java | 7 +-
.../java/org/apache/datasketches/theta/Sketch.java | 17 +-
.../apache/datasketches/theta/UpdateSketch.java | 6 +-
.../datasketches/theta/UpdateSketchBuilder.java | 6 +-
.../java/org/apache/datasketches/tuple/AnotB.java | 4 +-
.../apache/datasketches/tuple/Intersection.java | 6 +-
.../datasketches/theta/BackwardConversions.java | 4 +-
.../datasketches/theta/CompactSketchTest.java | 10 +-
.../ConcurrentDirectQuickSelectSketchTest.java | 1142 +++++++++-----------
.../theta/ConcurrentHeapQuickSelectSketchTest.java | 482 ++++-----
.../theta/DirectQuickSelectSketchTest.java | 2 +-
.../apache/datasketches/theta/DirectUnionTest.java | 4 +-
.../datasketches/theta/SetOpsCornerCasesTest.java | 28 +-
.../datasketches/theta/SingleItemSketchTest.java | 2 +-
.../org/apache/datasketches/theta/SketchTest.java | 12 +-
.../apache/datasketches/theta/UnionImplTest.java | 2 +-
28 files changed, 923 insertions(+), 1004 deletions(-)
diff --git a/src/main/java/org/apache/datasketches/theta/AnotBimpl.java b/src/main/java/org/apache/datasketches/theta/AnotBimpl.java
index 752baac..bccb455 100644
--- a/src/main/java/org/apache/datasketches/theta/AnotBimpl.java
+++ b/src/main/java/org/apache/datasketches/theta/AnotBimpl.java
@@ -92,7 +92,7 @@ final class AnotBimpl extends AnotB {
thetaLong_ = Math.min(thetaLong_, thetaLongB);
//Build hashtable and removes hashes of skB >= theta
- final int countB = skB.getRetainedEntries();
+ final int countB = skB.getRetainedEntries(true);
CompactSketch cskB = null;
UpdateSketch uskB = null;
final long[] hashTableB;
@@ -161,12 +161,12 @@ final class AnotBimpl extends AnotB {
? (CompactSketch)skA
: ((UpdateSketch)skA).compact();
final long[] hashArrA = cskA.getCache().clone();
- final int countA = cskA.getRetainedEntries();
+ final int countA = cskA.getRetainedEntries(true);
//Compare with skB
final long thetaLongB = skB.getThetaLong();
final long thetaLong = Math.min(thetaLongA, thetaLongB);
- final int countB = skB.getRetainedEntries();
+ final int countB = skB.getRetainedEntries(true);
//Rebuild hashtable and removes hashes of skB >= thetaLong
final long[] hashTableB = convertToHashTable(skB.getCache(), countB, thetaLong, REBUILD_THRESHOLD);
diff --git a/src/main/java/org/apache/datasketches/theta/ConcurrentHeapThetaBuffer.java b/src/main/java/org/apache/datasketches/theta/ConcurrentHeapThetaBuffer.java
index e4b18f2..e397ecc 100644
--- a/src/main/java/org/apache/datasketches/theta/ConcurrentHeapThetaBuffer.java
+++ b/src/main/java/org/apache/datasketches/theta/ConcurrentHeapThetaBuffer.java
@@ -29,14 +29,25 @@ import org.apache.datasketches.HashOperations;
import org.apache.datasketches.ResizeFactor;
/**
- * The theta filtering buffer that operates in the context of a single writing thread.
- * This is a bounded size filter. When the buffer becomes full its content is propagated into the
- * shared sketch.
- * The limit on the buffer size is configurable. Bound of size 1 allows to maintain error bound
- * that is close to the error bound of a sequential theta sketch.
+ * This is a theta filtering, bounded size buffer that operates in the context of a single writing
+ * thread. When the buffer becomes full its content is propagated into the shared sketch, which
+ * may be on a different thread. The limit on the buffer size is configurable. A bound of size 1
+ * allows the combination of buffers and shared sketch to maintain an error bound in real-time
+ * that is close to the error bound of a sequential theta sketch. Allowing larger buffer sizes
+ * enables amortization of the cost propagations and substantially improves overall system throughput.
+ * The error caused by the buffering is essentially a perspecitive of time and synchronization
+ * and not really a true error. At the end of a stream, after all the buffers have synchronized with
+ * the shared sketch, there is no additional error.
* Propagation is done either synchronously by the updating thread, or asynchronously by a
* background propagation thread.
*
+ * <p>This is a buffer, not a sketch, and it extends the <i>HeapQuickSelectSketch</i>
+ * in order to leverage some of the sketch machinery to make its work simple. However, if this
+ * buffer receives a query, like <i>getEstimate()</i>, the correct answer does not come from the super
+ * <i>HeapQuickSelectSketch</i>, which knows nothing about the concurrency relationship to the
+ * shared concurrent sketch, it must come from the shared concurrent sketch. As a result nearly all
+ * of the inherited sketch methods are redirected to the shared concurrent sketch.
+ *
* @author eshcar
* @author Lee Rhodes
*/
@@ -75,7 +86,41 @@ final class ConcurrentHeapThetaBuffer extends HeapQuickSelectSketch {
return Math.min(lgNomLongs, (int)Math.log(Math.sqrt(exactSize) / (2 * maxNumLocalBuffers)));
}
- //Sketch overrides
+ //concurrent restricted methods
+
+ /**
+ * Propagates a single hash value to the shared sketch
+ *
+ * @param hash to be propagated
+ */
+ private boolean propagateToSharedSketch(final long hash) {
+ //noinspection StatementWithEmptyBody
+ while (localPropagationInProgress.get()) {
+ } //busy wait until previous propagation completed
+ localPropagationInProgress.set(true);
+ final boolean res = shared.propagate(localPropagationInProgress, null, hash);
+ //in this case the parent empty_ and curCount_ were not touched
+ thetaLong_ = shared.getVolatileTheta();
+ return res;
+ }
+
+ /**
+ * Propagates the content of the buffer as a sketch to the shared sketch
+ */
+ private void propagateToSharedSketch() {
+ //noinspection StatementWithEmptyBody
+ while (localPropagationInProgress.get()) {
+ } //busy wait until previous propagation completed
+
+ final CompactSketch compactSketch = compact(propagateOrderedCompact, null);
+ localPropagationInProgress.set(true);
+ shared.propagate(localPropagationInProgress, compactSketch,
+ ConcurrentSharedThetaSketch.NOT_SINGLE_HASH);
+ super.reset();
+ thetaLong_ = shared.getVolatileTheta();
+ }
+
+ //Public Sketch overrides proxies to shared concurrent sketch
@Override
public int getCompactBytes() {
@@ -122,12 +167,14 @@ final class ConcurrentHeapThetaBuffer extends HeapQuickSelectSketch {
return shared.isEstimationMode();
}
+ //End of proxies
+
@Override
public byte[] toByteArray() {
throw new UnsupportedOperationException("Local theta buffer need not be serialized");
}
- //UpdateSketch overrides
+ //Public UpdateSketch overrides
@Override
public void reset() {
@@ -136,7 +183,7 @@ final class ConcurrentHeapThetaBuffer extends HeapQuickSelectSketch {
localPropagationInProgress.set(false);
}
- //restricted methods
+ //Restricted UpdateSketch overrides
/**
* Updates buffer with given hash value.
@@ -163,7 +210,7 @@ final class ConcurrentHeapThetaBuffer extends HeapQuickSelectSketch {
}
}
final UpdateReturnState state = super.hashUpdate(hash);
- if (isOutOfSpace(getRetainedEntries() + 1)) {
+ if (isOutOfSpace(getRetainedEntries(true) + 1)) {
propagateToSharedSketch();
return ConcurrentPropagated;
}
@@ -173,35 +220,5 @@ final class ConcurrentHeapThetaBuffer extends HeapQuickSelectSketch {
return state;
}
- /**
- * Propagates a single hash value to the shared sketch
- *
- * @param hash to be propagated
- */
- private boolean propagateToSharedSketch(final long hash) {
- //noinspection StatementWithEmptyBody
- while (localPropagationInProgress.get()) {
- } //busy wait until previous propagation completed
- localPropagationInProgress.set(true);
- final boolean res = shared.propagate(localPropagationInProgress, null, hash);
- //in this case the parent empty_ and curCount_ were not touched
- thetaLong_ = shared.getVolatileTheta();
- return res;
- }
-
- /**
- * Propagates the content of the buffer as a sketch to the shared sketch
- */
- private void propagateToSharedSketch() {
- //noinspection StatementWithEmptyBody
- while (localPropagationInProgress.get()) {
- } //busy wait until previous propagation completed
- final CompactSketch compactSketch = compact(propagateOrderedCompact, null);
- localPropagationInProgress.set(true);
- shared.propagate(localPropagationInProgress, compactSketch,
- ConcurrentSharedThetaSketch.NOT_SINGLE_HASH);
- super.reset();
- thetaLong_ = shared.getVolatileTheta();
- }
}
diff --git a/src/main/java/org/apache/datasketches/theta/ConcurrentSharedThetaSketch.java b/src/main/java/org/apache/datasketches/theta/ConcurrentSharedThetaSketch.java
index 3c2c4b1..49a8140 100644
--- a/src/main/java/org/apache/datasketches/theta/ConcurrentSharedThetaSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/ConcurrentSharedThetaSketch.java
@@ -24,9 +24,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.datasketches.memory.WritableMemory;
/**
- * An interface to define the API of a concurrent shared theta sketch.
+ * An internal interface to define the API of a concurrent shared theta sketch.
* It reflects all data processed by a single or multiple update threads, and can serve queries at
- * any time
+ * any time.
*
* @author eshcar
*/
@@ -117,6 +117,17 @@ interface ConcurrentSharedThetaSketch {
//The following mirrors are public methods that already exist on the "extends" side of the dual
// inheritance. They are provided here to allow casts to this interface access
// to these methods without having to cast back to the extended parent class.
+ //
+ //This allows an internal class to cast either the Concurrent Direct or Concurrent Heap
+ //shared class to this interface and have access to the above special concurrent methods as
+ //well as the methods below.
+ //
+ //For the external user all of the below methods can be obtained by casting the shared
+ //sketch to UpdateSketch. However, these methods here also act as an alias so that an
+ //attempt to access these methods from the local buffer will be divered to the shared
+ //sketch.
+
+ //From Sketch
int getCompactBytes();
diff --git a/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java b/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java
index 458a93d..fc78cb0 100644
--- a/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java
@@ -142,7 +142,7 @@ class DirectCompactSketch extends CompactSketch {
@Override
public HashIterator iterator() {
- return new MemoryHashIterator(mem_, getRetainedEntries(), getThetaLong());
+ return new MemoryHashIterator(mem_, getRetainedEntries(true), getThetaLong());
}
@Override
@@ -172,7 +172,12 @@ class DirectCompactSketch extends CompactSketch {
}
@Override
- int getCurrentPreambleLongs(final boolean compact) { //already compact; ignore
+ int getCompactPreambleLongs() {
+ return extractPreLongs(mem_);
+ }
+
+ @Override
+ int getCurrentPreambleLongs() {
return extractPreLongs(mem_);
}
diff --git a/src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketch.java b/src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketch.java
index f80473a..9309194 100644
--- a/src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketch.java
@@ -263,7 +263,7 @@ class DirectQuickSelectSketch extends DirectQuickSelectSketchR {
return RejectedDuplicate; //Duplicate, not inserted
}
//insertion occurred, increment curCount
- final int curCount = getRetainedEntries() + 1;
+ final int curCount = getRetainedEntries(true) + 1;
wmem_.putInt(RETAINED_ENTRIES_INT, curCount); //update curCount
if (isOutOfSpace(curCount)) { //we need to do something, we are out of space
diff --git a/src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketchR.java b/src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketchR.java
index 36f4da3..d5290b1 100644
--- a/src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketchR.java
+++ b/src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketchR.java
@@ -215,12 +215,16 @@ class DirectQuickSelectSketchR extends UpdateSketch {
}
@Override
- int getCurrentPreambleLongs(final boolean compact) {
- if (!compact) { return PreambleUtil.extractPreLongs(wmem_); }
+ int getCompactPreambleLongs() {
return computeCompactPreLongs(isEmpty(), getRetainedEntries(true), getThetaLong());
}
@Override
+ int getCurrentPreambleLongs() {
+ return PreambleUtil.extractPreLongs(wmem_);
+ }
+
+ @Override
WritableMemory getMemory() {
return wmem_;
}
diff --git a/src/main/java/org/apache/datasketches/theta/EmptyCompactSketch.java b/src/main/java/org/apache/datasketches/theta/EmptyCompactSketch.java
index 54c71eb..ffdb668 100644
--- a/src/main/java/org/apache/datasketches/theta/EmptyCompactSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/EmptyCompactSketch.java
@@ -138,7 +138,12 @@ final class EmptyCompactSketch extends CompactSketch {
}
@Override
- int getCurrentPreambleLongs(final boolean compact) {
+ int getCompactPreambleLongs() {
+ return 1;
+ }
+
+ @Override
+ int getCurrentPreambleLongs() {
return 1;
}
diff --git a/src/main/java/org/apache/datasketches/theta/HeapAlphaSketch.java b/src/main/java/org/apache/datasketches/theta/HeapAlphaSketch.java
index 5b1bb0c..2561b44 100644
--- a/src/main/java/org/apache/datasketches/theta/HeapAlphaSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/HeapAlphaSketch.java
@@ -267,12 +267,16 @@ final class HeapAlphaSketch extends HeapUpdateSketch {
//restricted methods
@Override
- int getCurrentPreambleLongs(final boolean compact) {
- if (!compact) { return Family.ALPHA.getMinPreLongs(); }
+ int getCompactPreambleLongs() {
return CompactOperations.computeCompactPreLongs(empty_, curCount_, thetaLong_);
}
@Override
+ int getCurrentPreambleLongs() {
+ return Family.ALPHA.getMinPreLongs();
+ }
+
+ @Override
WritableMemory getMemory() {
return null;
}
diff --git a/src/main/java/org/apache/datasketches/theta/HeapCompactSketch.java b/src/main/java/org/apache/datasketches/theta/HeapCompactSketch.java
index 82decd5..b898a5e 100644
--- a/src/main/java/org/apache/datasketches/theta/HeapCompactSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/HeapCompactSketch.java
@@ -82,7 +82,7 @@ class HeapCompactSketch extends CompactSketch {
@Override
public CompactSketch compact(final boolean dstOrdered, final WritableMemory dstMem) {
- return componentsToCompact(getThetaLong(), getRetainedEntries(), getSeedHash(), isEmpty(),
+ return componentsToCompact(getThetaLong(), getRetainedEntries(true), getSeedHash(), isEmpty(),
true, ordered_, dstOrdered, dstMem, getCache());
}
@@ -139,7 +139,12 @@ class HeapCompactSketch extends CompactSketch {
}
@Override
- int getCurrentPreambleLongs(final boolean compact) { //already compact; ignored
+ int getCompactPreambleLongs() {
+ return preLongs_;
+ }
+
+ @Override
+ int getCurrentPreambleLongs() { //already compact; ignored
return preLongs_;
}
@@ -164,7 +169,7 @@ class HeapCompactSketch extends CompactSketch {
final int singleItemBit = singleItem_ ? SINGLEITEM_FLAG_MASK : 0;
final byte flags = (byte) (emptyBit | READ_ONLY_FLAG_MASK | COMPACT_FLAG_MASK
| orderedBit | singleItemBit);
- final int preLongs = getCurrentPreambleLongs(true);
+ final int preLongs = getCompactPreambleLongs();
loadCompactMemory(getCache(), getSeedHash(), getRetainedEntries(true), getThetaLong(),
dstMem, flags, preLongs);
return byteArray;
diff --git a/src/main/java/org/apache/datasketches/theta/HeapQuickSelectSketch.java b/src/main/java/org/apache/datasketches/theta/HeapQuickSelectSketch.java
index e563c88..d6303bf 100644
--- a/src/main/java/org/apache/datasketches/theta/HeapQuickSelectSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/HeapQuickSelectSketch.java
@@ -213,11 +213,15 @@ class HeapQuickSelectSketch extends HeapUpdateSketch {
}
@Override
- int getCurrentPreambleLongs(final boolean compact) {
- if (!compact) { return preambleLongs_; }
+ int getCompactPreambleLongs() {
return CompactOperations.computeCompactPreLongs(empty_, curCount_, thetaLong_);
}
+ @Override
+ int getCurrentPreambleLongs() {
+ return preambleLongs_;
+ }
+
//only used by ConcurrentHeapThetaBuffer & Test
int getHashTableThreshold() {
return hashTableThreshold_;
diff --git a/src/main/java/org/apache/datasketches/theta/HeapUpdateSketch.java b/src/main/java/org/apache/datasketches/theta/HeapUpdateSketch.java
index c214042..d62e88d 100644
--- a/src/main/java/org/apache/datasketches/theta/HeapUpdateSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/HeapUpdateSketch.java
@@ -61,15 +61,8 @@ abstract class HeapUpdateSketch extends UpdateSketch {
//Sketch
@Override
- public int getCompactBytes() {
- final int preLongs = getCurrentPreambleLongs(true);
- final int dataLongs = getRetainedEntries(true);
- return (preLongs + dataLongs) << 3;
- }
-
- @Override
public int getCurrentBytes() {
- final int preLongs = getCurrentPreambleLongs(false);
+ final int preLongs = getCurrentPreambleLongs();
final int dataLongs = getCurrentDataLongs();
return (preLongs + dataLongs) << 3;
}
@@ -113,6 +106,7 @@ abstract class HeapUpdateSketch extends UpdateSketch {
return Util.computeSeedHash(getSeed());
}
+ //Used by HeapAlphaSketch and HeapQuickSelectSketch
byte[] toByteArray(final int preLongs, final byte familyID) {
if (isDirty()) { rebuild(); }
checkIllegalCurCountAndEmpty(isEmpty(), getRetainedEntries(true));
@@ -134,7 +128,7 @@ abstract class HeapUpdateSketch extends UpdateSketch {
insertCurCount(memOut, this.getRetainedEntries(true));
insertP(memOut, getP());
final long thetaLong =
- correctThetaOnCompact(isEmpty(), getRetainedEntries(), getThetaLong());
+ correctThetaOnCompact(isEmpty(), getRetainedEntries(true), getThetaLong());
insertThetaLong(memOut, thetaLong);
//Flags: BigEnd=0, ReadOnly=0, Empty=X, compact=0, ordered=0
diff --git a/src/main/java/org/apache/datasketches/theta/JaccardSimilarity.java b/src/main/java/org/apache/datasketches/theta/JaccardSimilarity.java
index ce93ab3..05c62cb 100644
--- a/src/main/java/org/apache/datasketches/theta/JaccardSimilarity.java
+++ b/src/main/java/org/apache/datasketches/theta/JaccardSimilarity.java
@@ -59,8 +59,8 @@ public final class JaccardSimilarity {
if (sketchA.isEmpty() && sketchB.isEmpty()) { return ONES.clone(); }
if (sketchA.isEmpty() || sketchB.isEmpty()) { return ZEROS.clone(); }
- final int countA = sketchA.getRetainedEntries();
- final int countB = sketchB.getRetainedEntries();
+ final int countA = sketchA.getRetainedEntries(true);
+ final int countB = sketchB.getRetainedEntries(true);
//Create the Union
final int minK = 1 << MIN_LG_NOM_LONGS;
@@ -74,7 +74,7 @@ public final class JaccardSimilarity {
final long thetaLongUAB = unionAB.getThetaLong();
final long thetaLongA = sketchA.getThetaLong();
final long thetaLongB = sketchB.getThetaLong();
- final int countUAB = unionAB.getRetainedEntries();
+ final int countUAB = unionAB.getRetainedEntries(true);
//Check for identical data
if ((countUAB == countA) && (countUAB == countB)
@@ -110,8 +110,8 @@ public final class JaccardSimilarity {
if (sketchA.isEmpty() && sketchB.isEmpty()) { return true; }
if (sketchA.isEmpty() || sketchB.isEmpty()) { return false; }
- final int countA = sketchA.getRetainedEntries();
- final int countB = sketchB.getRetainedEntries();
+ final int countA = sketchA.getRetainedEntries(true);
+ final int countB = sketchB.getRetainedEntries(true);
//Create the Union
final Union union =
@@ -122,7 +122,7 @@ public final class JaccardSimilarity {
final long thetaLongUAB = unionAB.getThetaLong();
final long thetaLongA = sketchA.getThetaLong();
final long thetaLongB = sketchB.getThetaLong();
- final int countUAB = unionAB.getRetainedEntries();
+ final int countUAB = unionAB.getRetainedEntries(true);
//Check for identical counts and thetas
if ((countUAB == countA) && (countUAB == countB)
diff --git a/src/main/java/org/apache/datasketches/theta/SingleItemSketch.java b/src/main/java/org/apache/datasketches/theta/SingleItemSketch.java
index 94a9bd2..cacd15e 100644
--- a/src/main/java/org/apache/datasketches/theta/SingleItemSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/SingleItemSketch.java
@@ -395,7 +395,12 @@ final class SingleItemSketch extends CompactSketch {
}
@Override
- int getCurrentPreambleLongs(final boolean compact) {
+ int getCompactPreambleLongs() {
+ return 1;
+ }
+
+ @Override
+ int getCurrentPreambleLongs() {
return 1;
}
diff --git a/src/main/java/org/apache/datasketches/theta/Sketch.java b/src/main/java/org/apache/datasketches/theta/Sketch.java
index aea0657..251abc8 100644
--- a/src/main/java/org/apache/datasketches/theta/Sketch.java
+++ b/src/main/java/org/apache/datasketches/theta/Sketch.java
@@ -573,15 +573,24 @@ public abstract class Sketch {
*/
abstract long[] getCache();
+ /**
+ * Gets preamble longs if stored in compact form. If this sketch is already in compact form,
+ * this is identical to the call {@link #getCurrentPreambleLongs()}.
+ * @return preamble longs if stored in compact form.
+ */
+ abstract int getCompactPreambleLongs();
+
+ /**
+ * Gets the number of data longs if stored in current state.
+ * @return the number of data longs if stored in current state.
+ */
abstract int getCurrentDataLongs();
/**
* Returns preamble longs if stored in current state.
- * @param compact if true, returns the preamble longs required for compact form.
- * If this sketch is already in compact form this parameter is ignored.
- * @return preamble longs if stored.
+ * @return number of preamble longs if stored.
*/
- abstract int getCurrentPreambleLongs(boolean compact);
+ abstract int getCurrentPreambleLongs();
/**
* Returns the Memory object if it exists, otherwise null.
diff --git a/src/main/java/org/apache/datasketches/theta/UpdateSketch.java b/src/main/java/org/apache/datasketches/theta/UpdateSketch.java
index bed93a5..c28c983 100644
--- a/src/main/java/org/apache/datasketches/theta/UpdateSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/UpdateSketch.java
@@ -137,14 +137,14 @@ public abstract class UpdateSketch extends Sketch {
@Override
public CompactSketch compact(final boolean dstOrdered, final WritableMemory dstMem) {
- return componentsToCompact(getThetaLong(), getRetainedEntries(), getSeedHash(), isEmpty(),
+ return componentsToCompact(getThetaLong(), getRetainedEntries(true), getSeedHash(), isEmpty(),
false, false, dstOrdered, dstMem, getCache());
}
@Override
public int getCompactBytes() {
- final int preLongs = getCurrentPreambleLongs(true);
- final int dataLongs = getRetainedEntries();
+ final int preLongs = getCompactPreambleLongs();
+ final int dataLongs = getRetainedEntries(true);
return (preLongs + dataLongs) << 3;
}
diff --git a/src/main/java/org/apache/datasketches/theta/UpdateSketchBuilder.java b/src/main/java/org/apache/datasketches/theta/UpdateSketchBuilder.java
index 705af29..dca3569 100644
--- a/src/main/java/org/apache/datasketches/theta/UpdateSketchBuilder.java
+++ b/src/main/java/org/apache/datasketches/theta/UpdateSketchBuilder.java
@@ -115,7 +115,7 @@ public class UpdateSketchBuilder {
* Be aware that sketches as large as this maximum value may not have been
* thoroughly tested or characterized for performance.
*
- * @param lgNomEntries the Log Nominal Entries for the concurrent shared sketch
+ * @param lgNomEntries the Log Nominal Entries. Also for the concurrent shared sketch
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setLogNominalEntries(final int lgNomEntries) {
@@ -139,7 +139,7 @@ public class UpdateSketchBuilder {
*
* @param nomEntries <a href="{@docRoot}/resources/dictionary.html#nomEntries">Nominal Entries</a>
* This will become the ceiling power of 2 if it is not.
- * @return this ConcurrentThetaBuilder
+ * @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setLocalNominalEntries(final int nomEntries) {
bLocalLgNomLongs = Integer.numberOfTrailingZeros(ceilingPowerOf2(nomEntries));
@@ -158,7 +158,7 @@ public class UpdateSketchBuilder {
* value have not been thoroughly tested or characterized for performance.
*
* @param lgNomEntries the Log Nominal Entries for a concurrent local sketch
- * @return this ConcurrentThetaBuilder
+ * @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setLocalLogNominalEntries(final int lgNomEntries) {
bLocalLgNomLongs = lgNomEntries;
diff --git a/src/main/java/org/apache/datasketches/tuple/AnotB.java b/src/main/java/org/apache/datasketches/tuple/AnotB.java
index ff5b37b..8dd3559 100644
--- a/src/main/java/org/apache/datasketches/tuple/AnotB.java
+++ b/src/main/java/org/apache/datasketches/tuple/AnotB.java
@@ -139,7 +139,7 @@ public final class AnotB<S extends Summary> {
final long thetaLongB = skB.getThetaLong();
thetaLong_ = Math.min(thetaLong_, thetaLongB);
//Build hashtable and removes hashes of skB >= theta
- final int countB = skB.getRetainedEntries();
+ final int countB = skB.getRetainedEntries(true);
final long[] hashTableB =
convertToHashTable(extractThetaHashArray(skB, countB), countB,
thetaLong_, REBUILD_THRESHOLD);
@@ -296,7 +296,7 @@ public final class AnotB<S extends Summary> {
//skB is not empty
final long thetaLongB = skB.getThetaLong();
final long thetaLong = Math.min(thetaLongA, thetaLongB);
- final int countB = skB.getRetainedEntries();
+ final int countB = skB.getRetainedEntries(true);
//Build/rebuild hashtable and removes hashes of skB >= thetaLong
final long[] hashTableB = //the following convert works for all theta sketches
diff --git a/src/main/java/org/apache/datasketches/tuple/Intersection.java b/src/main/java/org/apache/datasketches/tuple/Intersection.java
index a88223b..5dd5ea2 100644
--- a/src/main/java/org/apache/datasketches/tuple/Intersection.java
+++ b/src/main/java/org/apache/datasketches/tuple/Intersection.java
@@ -137,7 +137,7 @@ public class Intersection<S extends Summary> {
firstCall_ = false;
// input sketch is not null, could be first or next call
final long thetaLongIn = sketchIn.getThetaLong();
- final int countIn = sketchIn.getRetainedEntries();
+ final int countIn = sketchIn.getRetainedEntries(true);
thetaLong_ = min(thetaLong_, thetaLongIn); //Theta rule
// Empty rule extended in case incoming sketch does not have empty bit properly set
empty_ |= (countIn == 0) && (thetaLongIn == Long.MAX_VALUE);
@@ -159,7 +159,7 @@ public class Intersection<S extends Summary> {
}
final org.apache.datasketches.theta.Sketch nextSketch = sketchIn;
//Match nextSketch data with local instance data, filtering by theta
- final int maxMatchSize = min(hashTables_.count_, nextSketch.getRetainedEntries());
+ final int maxMatchSize = min(hashTables_.count_, nextSketch.getRetainedEntries(true));
final long[] matchHashArr = new long[maxMatchSize];
S[] matchSummaries = null;
@@ -263,7 +263,7 @@ public class Intersection<S extends Summary> {
}
void fromSketch(final org.apache.datasketches.theta.Sketch sketch, final S summary) {
- count_ = sketch.getRetainedEntries();
+ count_ = sketch.getRetainedEntries(true);
lgTableSize_ = getLgTableSize(count_);
S mySummary = null;
diff --git a/src/test/java/org/apache/datasketches/theta/BackwardConversions.java b/src/test/java/org/apache/datasketches/theta/BackwardConversions.java
index 5e3ed1f..0122f23 100644
--- a/src/test/java/org/apache/datasketches/theta/BackwardConversions.java
+++ b/src/test/java/org/apache/datasketches/theta/BackwardConversions.java
@@ -210,8 +210,8 @@ public class BackwardConversions {
return wmem;
}
//General CompactSketch
- final int preLongs = skV3.getCurrentPreambleLongs(true);
- final int entries = skV3.getRetainedEntries();
+ final int preLongs = skV3.getCompactPreambleLongs();
+ final int entries = skV3.getRetainedEntries(true);
final boolean unordered = !(skV3.isOrdered());
final byte flags = (byte) (0xA | (unordered ? 16 : 0)); //Unordered, NoRebuild, notEmpty, ReadOnly, LE
wmem = WritableMemory.allocate((preLongs + entries) << 3);
diff --git a/src/test/java/org/apache/datasketches/theta/CompactSketchTest.java b/src/test/java/org/apache/datasketches/theta/CompactSketchTest.java
index 502dd4b..fe8347e 100644
--- a/src/test/java/org/apache/datasketches/theta/CompactSketchTest.java
+++ b/src/test/java/org/apache/datasketches/theta/CompactSketchTest.java
@@ -118,7 +118,7 @@ public class CompactSketchTest {
assertNotNull(testSk.iterator());
assertEquals(testSk.toByteArray().length, 8);
assertEquals(testSk.getCache().length, 0);
- assertEquals(testSk.getCurrentPreambleLongs(true), 1);
+ assertEquals(testSk.getCompactPreambleLongs(), 1);
}
private static void checkSingleItemSketch(Sketch testSk, Sketch refSk) {
@@ -136,7 +136,7 @@ public class CompactSketchTest {
assertNotNull(testSk.iterator());
assertEquals(testSk.toByteArray().length, 16);
assertEquals(testSk.getCache().length, 1);
- assertEquals(testSk.getCurrentPreambleLongs(true), 1);
+ assertEquals(testSk.getCompactPreambleLongs(), 1);
}
private static void checkOtherCompactSketch(Sketch testSk, Sketch refSk, boolean ordered) {
@@ -162,12 +162,12 @@ public class CompactSketchTest {
assertTrue(testSk instanceof HeapCompactSketch);
}
assertEquals(testSk.getSeedHash(), refSk.getSeedHash());
- assertEquals(testSk.getRetainedEntries(true), refSk.getRetainedEntries());
+ assertEquals(testSk.getRetainedEntries(true), refSk.getRetainedEntries(true));
assertEquals(testSk.getEstimate(), refSk.getEstimate(), 0.0);
assertEquals(testSk.getCurrentBytes(), refSk.getCurrentBytes());
assertEquals(testSk.toByteArray().length, refSk.toByteArray().length);
assertEquals(testSk.getCache().length, refSk.getCache().length);
- assertEquals(testSk.getCurrentPreambleLongs(true), refSk.getCurrentPreambleLongs(true));
+ assertEquals(testSk.getCompactPreambleLongs(), refSk.getCompactPreambleLongs());
}
@Test
@@ -350,7 +350,7 @@ public class CompactSketchTest {
void check(CompactSketch csk) {
assertEquals(csk.getClass().getSimpleName(), classType, "ClassType");
- assertEquals(csk.getRetainedEntries(), count, "curCount");
+ assertEquals(csk.getRetainedEntries(true), count, "curCount");
assertEquals(csk.getCurrentBytes(), bytes, "Bytes" );
assertEquals(csk.isCompact(), compact, "Compact");
assertEquals(csk.isEmpty(), empty, "Empty");
diff --git a/src/test/java/org/apache/datasketches/theta/ConcurrentDirectQuickSelectSketchTest.java b/src/test/java/org/apache/datasketches/theta/ConcurrentDirectQuickSelectSketchTest.java
index 4a8e836..023a326 100644
--- a/src/test/java/org/apache/datasketches/theta/ConcurrentDirectQuickSelectSketchTest.java
+++ b/src/test/java/org/apache/datasketches/theta/ConcurrentDirectQuickSelectSketchTest.java
@@ -20,6 +20,7 @@
package org.apache.datasketches.theta;
import static org.apache.datasketches.Util.DEFAULT_UPDATE_SEED;
+import static org.apache.datasketches.theta.ConcurrentHeapQuickSelectSketchTest.waitForBgPropagationToComplete;
import static org.apache.datasketches.theta.PreambleUtil.FAMILY_BYTE;
import static org.apache.datasketches.theta.PreambleUtil.LG_NOM_LONGS_BYTE;
import static org.apache.datasketches.theta.PreambleUtil.SER_VER_BYTE;
@@ -31,8 +32,8 @@ import org.apache.datasketches.Family;
import org.apache.datasketches.HashOperations;
import org.apache.datasketches.SketchesArgumentException;
import org.apache.datasketches.memory.Memory;
-import org.apache.datasketches.memory.WritableDirectHandle;
import org.apache.datasketches.memory.WritableMemory;
+import org.apache.datasketches.theta.ConcurrentHeapQuickSelectSketchTest.SharedLocal;
import org.testng.annotations.Test;
/**
@@ -40,664 +41,476 @@ import org.testng.annotations.Test;
*/
@SuppressWarnings("javadoc")
public class ConcurrentDirectQuickSelectSketchTest {
-
- private int lgK;
- private volatile UpdateSketch shared;
-
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkBadSerVer() {
- lgK = 9;
- int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
-
- assertTrue(local.isEmpty());
-
- for (int i = 0; i< k; i++) { local.update(i); }
- waitForBgPropagationToComplete();
-
- assertFalse(local.isEmpty());
- assertEquals(local.getEstimate(), k, 0.0);
- assertEquals(shared.getRetainedEntries(false), k);
-
- mem.putByte(SER_VER_BYTE, (byte) 0); //corrupt the SerVer byte
-
- Sketch.wrap(mem);
- }
- }
+ private static final long SEED = DEFAULT_UPDATE_SEED;
@Test
public void checkDirectCompactConversion() {
- lgK = 9;
- int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
- buildSharedReturnLocalSketch(mem);
- assertTrue(shared instanceof ConcurrentDirectQuickSelectSketch);
- assertTrue(shared.compact().isCompact());
- }
- }
-
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkConstructorKtooSmall() {
- lgK = 3;
- int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
- buildSharedReturnLocalSketch(mem);
- }
- }
-
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkConstructorMemTooSmall() {
- lgK = 4;
- int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k/2)) {
- WritableMemory mem = h.get();
- buildSharedReturnLocalSketch(mem);
- }
- }
-
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkHeapifyIllegalFamilyID_heapify() {
- lgK = 9;
- int k = 1 << lgK;
- int bytes = (k << 4) + (Family.QUICKSELECT.getMinPreLongs() << 3);
- WritableMemory mem = WritableMemory.wrap(new byte[bytes]);
- buildSharedReturnLocalSketch(mem);
-
- mem.putByte(FAMILY_BYTE, (byte) 0); //corrupt the Family ID byte
-
- //try to heapify the corrupted mem
- Sketch.heapify(mem); //catch in Sketch.constructHeapSketch
+ int lgK = 9;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ assertTrue(sl.shared instanceof ConcurrentDirectQuickSelectSketch);
+ assertTrue(sl.shared.compact().isCompact());
}
@Test
public void checkHeapifyMemoryEstimating() {
- lgK = 9;
+ int lgK = 9;
int k = 1 << lgK;
int u = 2*k;
- boolean estimating = (u > k);
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
- for (int i=0; i<u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
-
- double sk1est = local.getEstimate();
- double sk1lb = local.getLowerBound(2);
- double sk1ub = local.getUpperBound(2);
- assertEquals(local.isEstimationMode(), estimating);
- assertEquals(local.getClass().getSimpleName(), "ConcurrentHeapThetaBuffer");
- int curCount1 = shared.getRetainedEntries(true);
- assertTrue(local.isDirect());
- assertTrue(local.hasMemory());
- assertEquals(local.getCurrentPreambleLongs(false), 3);
-
- UpdateSketch sharedHeap = Sketches.heapifyUpdateSketch(mem);
- assertEquals(sharedHeap.getEstimate(), sk1est);
- assertEquals(sharedHeap.getLowerBound(2), sk1lb);
- assertEquals(sharedHeap.getUpperBound(2), sk1ub);
- assertFalse(sharedHeap.isEmpty());
- assertEquals(sharedHeap.isEstimationMode(), estimating);
- assertEquals(sharedHeap.getClass().getSimpleName(), "HeapQuickSelectSketch");
- int curCount2 = sharedHeap.getRetainedEntries(true);
- long[] cache = sharedHeap.getCache();
- assertEquals(curCount1, curCount2);
- long thetaLong = sharedHeap.getThetaLong();
- int cacheCount = HashOperations.count(cache, thetaLong);
- assertEquals(curCount1, cacheCount);
- assertFalse(sharedHeap.isDirect());
- assertFalse(sharedHeap.hasMemory());
- }
- }
+ UpdateSketch shared = sl.shared; //off-heap
+ UpdateSketch local = sl.local;
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkWrapIllegalFamilyID_wrap() {
- lgK = 9;
- int k = 1 << lgK;
- int maxBytes = (k << 4) + (Family.QUICKSELECT.getMinPreLongs() << 3);
- WritableMemory mem = WritableMemory.wrap(new byte[maxBytes]);
+ for (int i=0; i<u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
- buildSharedReturnLocalSketch(mem);
+ assertEquals(shared.getClass().getSimpleName(), "ConcurrentDirectQuickSelectSketch");
+ assertEquals(local.getClass().getSimpleName(), "ConcurrentHeapThetaBuffer");
- mem.putByte(FAMILY_BYTE, (byte) 0); //corrupt the Sketch ID byte
+ //This sharedHeap is not linked to the concurrent local buffer
+ UpdateSketch sharedHeap = Sketches.heapifyUpdateSketch(sl.wmem);
+ assertEquals(sharedHeap.getClass().getSimpleName(), "HeapQuickSelectSketch");
- //try to wrap the corrupted mem
- Sketch.wrap(mem); //catch in Sketch.constructDirectSketch
+ checkMemoryDirectProxyMethods(local, shared);
+ checkOtherProxyMethods(local, shared);
+ checkOtherProxyMethods(local, sharedHeap);
+
+ int curCount1 = shared.getRetainedEntries(true);
+ int curCount2 = sharedHeap.getRetainedEntries(true);
+ assertEquals(curCount1, curCount2);
+ long[] cache = sharedHeap.getCache();
+ long thetaLong = sharedHeap.getThetaLong();
+ int cacheCount = HashOperations.count(cache, thetaLong);
+ assertEquals(curCount1, cacheCount);
+ assertEquals(local.getCurrentPreambleLongs(), 3);
}
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkWrapIllegalFamilyID_direct() {
- lgK = 9;
+ @Test
+ public void checkHeapifyByteArrayExact() {
+ int lgK = 9;
int k = 1 << lgK;
- int maxBytes = (k << 4) + (Family.QUICKSELECT.getMinPreLongs() << 3);
- WritableMemory mem = WritableMemory.wrap(new byte[maxBytes]);
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
- buildSharedReturnLocalSketch(mem);
+ for (int i=0; i< k; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
- mem.putByte(FAMILY_BYTE, (byte) 0); //corrupt the Sketch ID byte
+ byte[] serArr = shared.toByteArray();
+ Memory srcMem = Memory.wrap(serArr);
+ Sketch recoveredShared = Sketch.heapify(srcMem);
- //try to wrap the corrupted mem
- DirectQuickSelectSketch.writableWrap(mem, DEFAULT_UPDATE_SEED);
- }
+ //reconstruct to Native/Direct
+ final int bytes = Sketch.getMaxUpdateSketchBytes(k);
+ final WritableMemory wmem = WritableMemory.allocate(bytes);
+ shared = sl.bldr.buildSharedFromSketch((UpdateSketch)recoveredShared, wmem);
+ UpdateSketch local2 = sl.bldr.buildLocal(shared);
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkHeapifySeedConflict() {
- lgK = 9;
- int k = 1 << lgK;
- long seed1 = 1021;
- long seed2 = DEFAULT_UPDATE_SEED;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder().setSeed(seed1);
- //must build shared first
- shared = bldr.buildShared(mem);
- byte[] serArr = shared.toByteArray();
- Memory srcMem = Memory.wrap(serArr);
- Sketch.heapify(srcMem, seed2);
- }
- }
+ assertEquals(local2.getEstimate(), k, 0.0);
+ assertEquals(local2.getLowerBound(2), k, 0.0);
+ assertEquals(local2.getUpperBound(2), k, 0.0);
+ assertEquals(local2.isEmpty(), false);
+ assertEquals(local2.isEstimationMode(), false);
+ assertEquals(recoveredShared.getClass().getSimpleName(), "HeapQuickSelectSketch");
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkCorruptLgNomLongs() {
- lgK = 4;
- int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
- buildSharedReturnLocalSketch(mem);
- mem.putByte(LG_NOM_LONGS_BYTE, (byte)2); //corrupt
- Sketch.heapify(mem, DEFAULT_UPDATE_SEED);
- }
- }
+ // Run toString just to make sure that we can pull out all of the relevant information.
+ // That is, this is being run for its side-effect of accessing things.
+ // If something is wonky, it will generate an exception and fail the test.
+ local2.toString(true, true, 8, true);
- @Test(expectedExceptions = UnsupportedOperationException.class)
- public void checkIllegalHashUpdate() {
- lgK = 4;
- int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- buildSharedReturnLocalSketch(h.get());
- shared.hashUpdate(1);
- }
- }
-
- @Test
- public void checkHeapifyByteArrayExact() {
- lgK = 9;
- int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- UpdateSketch local = buildSharedReturnLocalSketch(mem);
-
- for (int i=0; i< k; i++) { local.update(i); }
- waitForBgPropagationToComplete();
-
- byte[] serArr = shared.toByteArray();
- Memory srcMem = Memory.wrap(serArr);
- Sketch recoveredShared = Sketch.heapify(srcMem);
-
- //reconstruct to Native/Direct
- final int bytes = Sketch.getMaxUpdateSketchBytes(k);
- final WritableMemory wmem = WritableMemory.allocate(bytes);
- shared = bldr.buildSharedFromSketch((UpdateSketch)recoveredShared, wmem);
- UpdateSketch local2 = bldr.buildLocal(shared);
-
- assertEquals(local2.getEstimate(), k, 0.0);
- assertEquals(local2.getLowerBound(2), k, 0.0);
- assertEquals(local2.getUpperBound(2), k, 0.0);
- assertEquals(local2.isEmpty(), false);
- assertEquals(local2.isEstimationMode(), false);
- assertEquals(recoveredShared.getClass().getSimpleName(), "HeapQuickSelectSketch");
-
- // Run toString just to make sure that we can pull out all of the relevant information.
- // That is, this is being run for its side-effect of accessing things.
- // If something is wonky, it will generate an exception and fail the test.
- local2.toString(true, true, 8, true);
- }
}
@Test
public void checkHeapifyByteArrayEstimating() {
- lgK = 12;
+ int lgK = 12;
int k = 1 << lgK;
int u = 2*k;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
- final UpdateSketchBuilder bldr = configureBuilder();
- UpdateSketch local = buildSharedReturnLocalSketch(mem);
-
- for (int i=0; i<u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
-
- double uskEst = local.getEstimate();
- double uskLB = local.getLowerBound(2);
- double uskUB = local.getUpperBound(2);
- assertEquals(local.isEstimationMode(), true);
-
- byte[] serArr = shared.toByteArray();
- Memory srcMem = Memory.wrap(serArr);
- Sketch recoveredShared = Sketch.heapify(srcMem);
-
- //reconstruct to Native/Direct
- final int bytes = Sketch.getMaxUpdateSketchBytes(k);
- final WritableMemory wmem = WritableMemory.allocate(bytes);
- shared = bldr.buildSharedFromSketch((UpdateSketch)recoveredShared, wmem);
- UpdateSketch local2 = bldr.buildLocal(shared);
-
-
- assertEquals(local2.getEstimate(), uskEst);
- assertEquals(local2.getLowerBound(2), uskLB);
- assertEquals(local2.getUpperBound(2), uskUB);
- assertEquals(local2.isEmpty(), false);
- assertEquals(local2.isEstimationMode(), true);
- assertEquals(recoveredShared.getClass().getSimpleName(), "HeapQuickSelectSketch");
- }
+
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+
+ for (int i=0; i<u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
+
+ double uskEst = local.getEstimate();
+ double uskLB = local.getLowerBound(2);
+ double uskUB = local.getUpperBound(2);
+ assertEquals(local.isEstimationMode(), true);
+
+ byte[] serArr = shared.toByteArray();
+ Memory srcMem = Memory.wrap(serArr);
+ Sketch recoveredShared = Sketch.heapify(srcMem);
+
+ //reconstruct to Native/Direct
+ final int bytes = Sketch.getMaxUpdateSketchBytes(k);
+ final WritableMemory wmem = WritableMemory.allocate(bytes);
+ shared = sl.bldr.buildSharedFromSketch((UpdateSketch)recoveredShared, wmem);
+ UpdateSketch local2 = sl.bldr.buildLocal(shared);
+
+ assertEquals(local2.getEstimate(), uskEst);
+ assertEquals(local2.getLowerBound(2), uskLB);
+ assertEquals(local2.getUpperBound(2), uskUB);
+ assertEquals(local2.isEmpty(), false);
+ assertEquals(local2.isEstimationMode(), true);
+ assertEquals(recoveredShared.getClass().getSimpleName(), "HeapQuickSelectSketch");
}
@Test
public void checkWrapMemoryEst() {
- lgK = 9;
+ int lgK = 9;
int k = 1 << lgK;
int u = 2*k;
boolean estimating = (u > k);
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
- UpdateSketch local = buildSharedReturnLocalSketch(mem);
- for (int i=0; i<u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
- double sk1est = local.getEstimate();
- double sk1lb = local.getLowerBound(2);
- double sk1ub = local.getUpperBound(2);
- assertEquals(local.isEstimationMode(), estimating);
+ for (int i=0; i<u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
- Sketch local2 = Sketch.wrap(mem);
+ double sk1est = local.getEstimate();
+ double sk1lb = local.getLowerBound(2);
+ double sk1ub = local.getUpperBound(2);
+ assertEquals(local.isEstimationMode(), estimating);
- assertEquals(local2.getEstimate(), sk1est);
- assertEquals(local2.getLowerBound(2), sk1lb);
- assertEquals(local2.getUpperBound(2), sk1ub);
- assertEquals(local2.isEmpty(), false);
- assertEquals(local2.isEstimationMode(), estimating);
- }
+ Sketch local2 = Sketch.wrap(sl.wmem);
+
+ assertEquals(local2.getEstimate(), sk1est);
+ assertEquals(local2.getLowerBound(2), sk1lb);
+ assertEquals(local2.getUpperBound(2), sk1ub);
+ assertEquals(local2.isEmpty(), false);
+ assertEquals(local2.isEstimationMode(), estimating);
}
@Test
public void checkDQStoCompactForms() {
- lgK = 9;
+ int lgK = 9;
int k = 1 << lgK;
int u = 4*k;
boolean estimating = (u > k);
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
-
- assertEquals(local.getClass().getSimpleName(), "ConcurrentHeapThetaBuffer");
- assertTrue(local.isDirect());
- assertTrue(local.hasMemory());
-
- for (int i=0; i<u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
-
- shared.rebuild(); //forces size back to k
-
- //get baseline values
- double localEst = local.getEstimate();
- double localLB = local.getLowerBound(2);
- double localUB = local.getUpperBound(2);
- assertEquals(local.isEstimationMode(), estimating);
-
- CompactSketch csk;
-
- csk = shared.compact(false, null);
- assertEquals(csk.getEstimate(), localEst);
- assertEquals(csk.getLowerBound(2), localLB);
- assertEquals(csk.getUpperBound(2), localUB);
- assertFalse(csk.isEmpty());
- assertEquals(csk.isEstimationMode(), estimating);
- assertEquals(csk.getClass().getSimpleName(), "HeapCompactSketch");
-
- csk = shared.compact(true, null);
- assertEquals(csk.getEstimate(), localEst);
- assertEquals(csk.getLowerBound(2), localLB);
- assertEquals(csk.getUpperBound(2), localUB);
- assertFalse(csk.isEmpty());
- assertEquals(csk.isEstimationMode(), estimating);
- assertEquals(csk.getClass().getSimpleName(), "HeapCompactSketch");
-
- int bytes = local.getCompactBytes(); //TODO WHAT IS GOING ON HERE
- assertEquals(bytes, (k*8) + (Family.COMPACT.getMaxPreLongs() << 3));
- byte[] memArr2 = new byte[bytes];
- WritableMemory mem2 = WritableMemory.wrap(memArr2);
-
- csk = shared.compact(false, mem2);
- assertEquals(csk.getEstimate(), localEst);
- assertEquals(csk.getLowerBound(2), localLB);
- assertEquals(csk.getUpperBound(2), localUB);
- assertFalse(csk.isEmpty());
- assertEquals(csk.isEstimationMode(), estimating);
- assertEquals(csk.getClass().getSimpleName(), "DirectCompactSketch");
-
- mem2.clear();
- csk = shared.compact(true, mem2);
- assertEquals(csk.getEstimate(), localEst);
- assertEquals(csk.getLowerBound(2), localLB);
- assertEquals(csk.getUpperBound(2), localUB);
- assertFalse(csk.isEmpty());
- assertEquals(csk.isEstimationMode(), estimating);
- assertEquals(csk.getClass().getSimpleName(), "DirectCompactSketch");
- csk.toString(false, true, 0, false);
- }
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+
+ assertEquals(local.getClass().getSimpleName(), "ConcurrentHeapThetaBuffer");
+ assertFalse(local.isDirect());
+ assertTrue(local.hasMemory());
+
+ for (int i=0; i<u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
+
+ shared.rebuild(); //forces size back to k
+
+ //get baseline values
+ double localEst = local.getEstimate();
+ double localLB = local.getLowerBound(2);
+ double localUB = local.getUpperBound(2);
+ assertEquals(local.isEstimationMode(), estimating);
+
+ CompactSketch csk;
+
+ csk = shared.compact(false, null);
+ assertEquals(csk.getEstimate(), localEst);
+ assertEquals(csk.getLowerBound(2), localLB);
+ assertEquals(csk.getUpperBound(2), localUB);
+ assertFalse(csk.isEmpty());
+ assertEquals(csk.isEstimationMode(), estimating);
+ assertEquals(csk.getClass().getSimpleName(), "HeapCompactSketch");
+
+ csk = shared.compact(true, null);
+ assertEquals(csk.getEstimate(), localEst);
+ assertEquals(csk.getLowerBound(2), localLB);
+ assertEquals(csk.getUpperBound(2), localUB);
+ assertFalse(csk.isEmpty());
+ assertEquals(csk.isEstimationMode(), estimating);
+ assertEquals(csk.getClass().getSimpleName(), "HeapCompactSketch");
+
+ int bytes = shared.getCompactBytes();
+ assertEquals(bytes, (k*8) + (Family.COMPACT.getMaxPreLongs() << 3));
+ byte[] memArr2 = new byte[bytes];
+ WritableMemory mem2 = WritableMemory.wrap(memArr2);
+
+ csk = shared.compact(false, mem2);
+ assertEquals(csk.getEstimate(), localEst);
+ assertEquals(csk.getLowerBound(2), localLB);
+ assertEquals(csk.getUpperBound(2), localUB);
+ assertFalse(csk.isEmpty());
+ assertEquals(csk.isEstimationMode(), estimating);
+ assertEquals(csk.getClass().getSimpleName(), "DirectCompactSketch");
+
+ mem2.clear();
+ csk = shared.compact(true, mem2);
+ assertEquals(csk.getEstimate(), localEst);
+ assertEquals(csk.getLowerBound(2), localLB);
+ assertEquals(csk.getUpperBound(2), localUB);
+ assertFalse(csk.isEmpty());
+ assertEquals(csk.isEstimationMode(), estimating);
+ assertEquals(csk.getClass().getSimpleName(), "DirectCompactSketch");
+ csk.toString(false, true, 0, false);
}
@Test
public void checkDQStoCompactEmptyForms() {
- lgK = 9;
- int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
-
- //empty
- local.toString(false, true, 0, false); //exercise toString
- assertEquals(local.getClass().getSimpleName(), "ConcurrentHeapThetaBuffer");
- double localEst = local.getEstimate();
- double localLB = local.getLowerBound(2);
- double localUB = local.getUpperBound(2);
- assertFalse(local.isEstimationMode());
-
- int bytes = local.getCompactBytes(); //compact form
- assertEquals(bytes, 8);
- byte[] memArr2 = new byte[bytes];
- WritableMemory mem2 = WritableMemory.wrap(memArr2);
-
- CompactSketch csk2 = shared.compact(false, mem2);
- assertEquals(csk2.getEstimate(), localEst);
- assertEquals(csk2.getLowerBound(2), localLB);
- assertEquals(csk2.getUpperBound(2), localUB);
- assertTrue(csk2.isEmpty());
- assertFalse(csk2.isEstimationMode());
- assertTrue(csk2.isOrdered());
- CompactSketch csk3 = shared.compact(true, mem2);
- csk3.toString(false, true, 0, false);
- csk3.toString();
- assertEquals(csk3.getEstimate(), localEst);
- assertEquals(csk3.getLowerBound(2), localLB);
- assertEquals(csk3.getUpperBound(2), localUB);
- assertTrue(csk3.isEmpty());
- assertFalse(csk3.isEstimationMode());
- assertTrue(csk2.isOrdered());
- }
+ int lgK = 9;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+
+ //empty
+ local.toString(false, true, 0, false); //exercise toString
+ assertEquals(local.getClass().getSimpleName(), "ConcurrentHeapThetaBuffer");
+ double localEst = local.getEstimate();
+ double localLB = local.getLowerBound(2);
+ double localUB = local.getUpperBound(2);
+ assertFalse(local.isEstimationMode());
+
+ int bytes = local.getCompactBytes(); //compact form
+ assertEquals(bytes, 8);
+ byte[] memArr2 = new byte[bytes];
+ WritableMemory mem2 = WritableMemory.wrap(memArr2);
+
+ CompactSketch csk2 = shared.compact(false, mem2);
+ assertEquals(csk2.getEstimate(), localEst);
+ assertEquals(csk2.getLowerBound(2), localLB);
+ assertEquals(csk2.getUpperBound(2), localUB);
+ assertTrue(csk2.isEmpty());
+ assertFalse(csk2.isEstimationMode());
+ assertTrue(csk2.isOrdered());
+ CompactSketch csk3 = shared.compact(true, mem2);
+ csk3.toString(false, true, 0, false);
+ csk3.toString();
+ assertEquals(csk3.getEstimate(), localEst);
+ assertEquals(csk3.getLowerBound(2), localLB);
+ assertEquals(csk3.getUpperBound(2), localUB);
+ assertTrue(csk3.isEmpty());
+ assertFalse(csk3.isEstimationMode());
+ assertTrue(csk2.isOrdered());
}
@Test
public void checkEstMode() {
- lgK = 12;
+ int lgK = 12;
int k = 1 << lgK;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
-
- assertTrue(local.isEmpty());
- int u = 3*k;
-
- for (int i = 0; i< u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
+ assertTrue(local.isEmpty());
+ int u = 3*k;
- assertTrue(shared.getRetainedEntries(false) > k);
- }
+ for (int i = 0; i< u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
+ assertTrue(shared.getRetainedEntries(false) > k);
}
@Test
public void checkErrorBounds() {
- lgK = 9;
+ int lgK = 9;
int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- UpdateSketch local = buildSharedReturnLocalSketch(mem);
-
- //Exact mode
- for (int i = 0; i < k; i++ ) { local.update(i); }
- waitForBgPropagationToComplete();
-
- double est = local.getEstimate();
- double lb = local.getLowerBound(2);
- double ub = local.getUpperBound(2);
- assertEquals(est, ub, 0.0);
- assertEquals(est, lb, 0.0);
-
- //Est mode
- int u = 100*k;
- for (int i = k; i < u; i++ ) {
- local.update(i);
- local.update(i); //test duplicate rejection
- }
- waitForBgPropagationToComplete();
- est = local.getEstimate();
- lb = local.getLowerBound(2);
- ub = local.getUpperBound(2);
- assertTrue(est <= ub);
- assertTrue(est >= lb);
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+
+ //Exact mode
+ for (int i = 0; i < k; i++ ) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
+
+ double est = local.getEstimate();
+ double lb = local.getLowerBound(2);
+ double ub = local.getUpperBound(2);
+ assertEquals(est, ub, 0.0);
+ assertEquals(est, lb, 0.0);
+
+ //Est mode
+ int u = 100*k;
+ for (int i = k; i < u; i++ ) {
+ local.update(i);
+ local.update(i); //test duplicate rejection
}
+ waitForBgPropagationToComplete(shared);
+ est = local.getEstimate();
+ lb = local.getLowerBound(2);
+ ub = local.getUpperBound(2);
+ assertTrue(est <= ub);
+ assertTrue(est >= lb);
}
@Test
public void checkUpperAndLowerBounds() {
- lgK = 9;
+ int lgK = 9;
int k = 1 << lgK;
int u = 2*k;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- UpdateSketch local = buildSharedReturnLocalSketch(mem);
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
- for (int i = 0; i < u; i++ ) { local.update(i); }
- waitForBgPropagationToComplete();
+ for (int i = 0; i < u; i++ ) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
- double est = local.getEstimate();
- double ub = local.getUpperBound(1);
- double lb = local.getLowerBound(1);
- assertTrue(ub > est);
- assertTrue(lb < est);
- }
+ double est = local.getEstimate();
+ double ub = local.getUpperBound(1);
+ double lb = local.getLowerBound(1);
+ assertTrue(ub > est);
+ assertTrue(lb < est);
}
@Test
public void checkRebuild() {
- lgK = 9;
+ int lgK = 9;
int k = 1 << lgK;
int u = 4*k;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
+ assertTrue(local.isEmpty());
- assertTrue(local.isEmpty());
+ for (int i = 0; i< u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
- for (int i = 0; i< u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
+ assertFalse(local.isEmpty());
+ assertTrue(local.getEstimate() > 0.0);
+ assertTrue(shared.getRetainedEntries(false) >= k);
- assertFalse(local.isEmpty());
- assertTrue(local.getEstimate() > 0.0);
- assertTrue(shared.getRetainedEntries(false) >= k);
-
- shared.rebuild();
- assertEquals(shared.getRetainedEntries(false), k);
- assertEquals(shared.getRetainedEntries(true), k);
- local.rebuild();
- assertEquals(shared.getRetainedEntries(false), k);
- assertEquals(shared.getRetainedEntries(true), k);
- }
+ shared.rebuild();
+ assertEquals(shared.getRetainedEntries(false), k);
+ assertEquals(shared.getRetainedEntries(true), k);
+ local.rebuild();
+ assertEquals(shared.getRetainedEntries(false), k);
+ assertEquals(shared.getRetainedEntries(true), k);
}
@Test
public void checkResetAndStartingSubMultiple() {
- lgK = 9;
+ int lgK = 9;
int k = 1 << lgK;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
-
- assertTrue(local.isEmpty());
-
- int u = 4*k;
- for (int i = 0; i< u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
-
- assertFalse(local.isEmpty());
- assertTrue(shared.getRetainedEntries(false) >= k);
- assertTrue(local.getThetaLong() < Long.MAX_VALUE);
-
- shared.reset();
- local.reset();
- assertTrue(local.isEmpty());
- assertEquals(shared.getRetainedEntries(false), 0);
- assertEquals(local.getEstimate(), 0.0, 0.0);
- assertEquals(local.getThetaLong(), Long.MAX_VALUE);
- }
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+
+ assertTrue(local.isEmpty());
+
+ int u = 4*k;
+ for (int i = 0; i< u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
+
+ assertFalse(local.isEmpty());
+ assertTrue(shared.getRetainedEntries(false) >= k);
+ assertTrue(local.getThetaLong() < Long.MAX_VALUE);
+
+ shared.reset();
+ local.reset();
+ assertTrue(local.isEmpty());
+ assertEquals(shared.getRetainedEntries(false), 0);
+ assertEquals(local.getEstimate(), 0.0, 0.0);
+ assertEquals(local.getThetaLong(), Long.MAX_VALUE);
}
@Test
public void checkExactModeMemoryArr() {
- lgK = 12;
+ int lgK = 12;
int k = 1 << lgK;
int u = k;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
- assertTrue(local.isEmpty());
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+ assertTrue(local.isEmpty());
- for (int i = 0; i< u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
+ for (int i = 0; i< u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
- assertEquals(local.getEstimate(), u, 0.0);
- assertEquals(shared.getRetainedEntries(false), u);
- }
+ assertEquals(local.getEstimate(), u, 0.0);
+ assertEquals(shared.getRetainedEntries(false), u);
}
@Test
public void checkEstModeMemoryArr() {
- lgK = 12;
+ int lgK = 12;
int k = 1 << lgK;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+ assertTrue(local.isEmpty());
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
- assertTrue(local.isEmpty());
-
- int u = 3*k;
- for (int i = 0; i< u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
+ int u = 3*k;
+ for (int i = 0; i< u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
- double est = local.getEstimate();
- assertTrue((est < (u * 1.05)) && (est > (u * 0.95)));
- assertTrue(shared.getRetainedEntries(false) >= k);
- }
+ double est = local.getEstimate();
+ assertTrue((est < (u * 1.05)) && (est > (u * 0.95)));
+ assertTrue(shared.getRetainedEntries(false) >= k);
}
@Test
public void checkEstModeNativeMemory() {
- lgK = 12;
+ int lgK = 12;
int k = 1 << lgK;
- int memCapacity = (k << 4) + (Family.QUICKSELECT.getMinPreLongs() << 3);
-
- try(WritableDirectHandle memHandler = WritableMemory.allocateDirect(memCapacity)) {
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(memHandler.get());
- UpdateSketch local = bldr.buildLocal(shared);
- assertTrue(local.isEmpty());
- int u = 3*k;
-
- for (int i = 0; i< u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
- double est = local.getEstimate();
- assertTrue((est < (u * 1.05)) && (est > (u * 0.95)));
- assertTrue(shared.getRetainedEntries(false) >= k);
- }
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+ assertTrue(local.isEmpty());
+
+ int u = 3*k;
+ for (int i = 0; i< u; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
+ double est = local.getEstimate();
+ assertTrue((est < (u * 1.05)) && (est > (u * 0.95)));
+ assertTrue(shared.getRetainedEntries(false) >= k);
}
@Test
public void checkConstructReconstructFromMemory() {
- lgK = 12;
+ int lgK = 12;
int k = 1 << lgK;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(h.get());
- UpdateSketch local = bldr.buildLocal(shared);
- assertTrue(local.isEmpty());
- int u = 3*k;
-
- for (int i = 0; i< u; i++) { local.update(i); } //force estimation
- waitForBgPropagationToComplete();
-
- double est1 = local.getEstimate();
- int count1 = shared.getRetainedEntries(false);
- assertTrue((est1 < (u * 1.05)) && (est1 > (u * 0.95)));
- assertTrue(count1 >= k);
-
- byte[] serArr;
- double est2;
-
- serArr = shared.toByteArray();
- WritableMemory mem = WritableMemory.wrap(serArr);
- UpdateSketch recoveredShared = Sketches.wrapUpdateSketch(mem);
-
- //reconstruct to Native/Direct
- final int bytes = Sketch.getMaxUpdateSketchBytes(k);
- final WritableMemory wmem = WritableMemory.allocate(bytes);
- shared = bldr.buildSharedFromSketch(recoveredShared, wmem);
- UpdateSketch local2 = bldr.buildLocal(shared);
- est2 = local2.getEstimate();
-
- assertEquals(est2, est1, 0.0);
- }
+ assertTrue(local.isEmpty());
+ int u = 3*k;
+
+ for (int i = 0; i< u; i++) { local.update(i); } //force estimation
+ waitForBgPropagationToComplete(shared);
+
+ double est1 = local.getEstimate();
+ int count1 = shared.getRetainedEntries(false);
+ assertTrue((est1 < (u * 1.05)) && (est1 > (u * 0.95)));
+ assertTrue(count1 >= k);
+
+ byte[] serArr;
+ double est2;
+
+ serArr = shared.toByteArray();
+ WritableMemory mem = WritableMemory.wrap(serArr);
+ UpdateSketch recoveredShared = Sketches.wrapUpdateSketch(mem);
+
+ //reconstruct to Native/Direct
+ final int bytes = Sketch.getMaxUpdateSketchBytes(k);
+ final WritableMemory wmem = WritableMemory.allocate(bytes);
+ shared = sl.bldr.buildSharedFromSketch(recoveredShared, wmem);
+ UpdateSketch local2 = sl.bldr.buildLocal(shared);
+ est2 = local2.getEstimate();
+
+ assertEquals(est2, est1, 0.0);
}
@Test
@@ -706,73 +519,171 @@ public class ConcurrentDirectQuickSelectSketchTest {
final UpdateSketch sk = bldr.build();
for (int i = 0; i < 1000; i++) { sk.update(i); }
final UpdateSketch shared = bldr.buildSharedFromSketch(sk, null);
- assertEquals(shared.getRetainedEntries(), 1000);
+ assertEquals(shared.getRetainedEntries(true), 1000);
assertFalse(shared.hasMemory());
}
//checks Alex's bug where lgArrLongs > lgNomLongs +1.
@Test
public void checkResizeInBigMem() {
- lgK = 14;
- int k = 1 << lgK;
+ int lgK = 14;
int u = 1 << 20;
- WritableMemory mem = WritableMemory.wrap(new byte[(8*k*16) +24]);
- UpdateSketch local = buildSharedReturnLocalSketch(mem);
- for (int i=0; i<u; i++) { local.update(i); }
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, SEED, useMem, true, 8); //mem is 8X larger than needed
+ UpdateSketch local = sl.local;
+
+ for (int i = 0; i < u; i++) { local.update(i); }
+ }
+
+ @SuppressWarnings("unused")
+ @Test(expectedExceptions = SketchesArgumentException.class)
+ public void checkConstructorKtooSmall() {
+ int lgK = 3;
+ boolean useMem = true;
+ new SharedLocal(lgK, lgK, useMem);
+ }
+
+ @Test(expectedExceptions = SketchesArgumentException.class)
+ public void checkConstructorMemTooSmall() {
+ int lgK = 4;
+ int k = 1 << lgK;
+ WritableMemory wmem = WritableMemory.allocate(k/2);
+ UpdateSketchBuilder bldr = new UpdateSketchBuilder();
+ bldr.setLogNominalEntries(lgK);
+ bldr.buildShared(wmem);
+ }
+
+ @Test(expectedExceptions = SketchesArgumentException.class)
+ public void checkHeapifyIllegalFamilyID_heapify() {
+ int lgK = 9;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ sl.wmem.putByte(FAMILY_BYTE, (byte) 0); //corrupt the Family ID byte
+ //try to heapify the corrupted mem
+ Sketch.heapify(sl.wmem); //catch in Sketch.constructHeapSketch
}
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkBadLgNomLongs() {
- int k = 16;
- lgK = 4;
- WritableMemory mem = WritableMemory.wrap(new byte[(k*16) +24]);
- buildSharedReturnLocalSketch(mem);
- mem.putByte(LG_NOM_LONGS_BYTE, (byte) 3); //Corrupt LgNomLongs byte
- DirectQuickSelectSketch.writableWrap(mem, DEFAULT_UPDATE_SEED);
+ int lgK = 4;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ sl.wmem.putByte(LG_NOM_LONGS_BYTE, (byte) 3); //Corrupt LgNomLongs byte
+ DirectQuickSelectSketch.writableWrap(sl.wmem, DEFAULT_UPDATE_SEED);
}
@Test
public void checkBackgroundPropagation() {
- lgK = 4;
+ int lgK = 4;
int k = 1 << lgK;
int u = 10*k;
- try (WritableDirectHandle h = makeNativeMemory(k)) {
- WritableMemory mem = h.get();
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- UpdateSketch local = bldr.buildLocal(shared);
- ConcurrentHeapThetaBuffer sk1 = (ConcurrentHeapThetaBuffer)local; //for internal checks
-
- assertTrue(local.isEmpty());
-
- int i = 0;
- for (; i< k; i++) {
- local.update(i);
- }
-// waitForBgPropagationToComplete();
- assertFalse(local.isEmpty());
- assertTrue(local.getEstimate() > 0.0);
- long theta1 = ((ConcurrentSharedThetaSketch)shared).getVolatileTheta();
-
- for (; i< u; i++) {
- local.update(i);
- }
- waitForBgPropagationToComplete();
-
- long theta2 = ((ConcurrentSharedThetaSketch)shared).getVolatileTheta();
- int entries = shared.getRetainedEntries(false);
- assertTrue((entries > k) || (theta2 < theta1),
- "entries="+entries+" k="+k+" theta1="+theta1+" theta2="+theta2);
-
- shared.rebuild();
- assertEquals(shared.getRetainedEntries(false), k);
- assertEquals(shared.getRetainedEntries(true), k);
- sk1.rebuild();
- assertEquals(shared.getRetainedEntries(false), k);
- assertEquals(shared.getRetainedEntries(true), k);
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+ assertTrue(local.isEmpty());
+ ConcurrentHeapThetaBuffer sk1 = (ConcurrentHeapThetaBuffer)local; //for internal checks
+
+ int i = 0;
+ for (; i< k; i++) {
+ local.update(i);
+ }
+ waitForBgPropagationToComplete(shared);
+ assertFalse(local.isEmpty());
+ assertTrue(local.getEstimate() > 0.0);
+ long theta1 = ((ConcurrentSharedThetaSketch)shared).getVolatileTheta();
+
+ for (; i< u; i++) {
+ local.update(i);
}
+ waitForBgPropagationToComplete(shared);
+
+ long theta2 = ((ConcurrentSharedThetaSketch)shared).getVolatileTheta();
+ int entries = shared.getRetainedEntries(false);
+ assertTrue((entries > k) || (theta2 < theta1),
+ "entries="+entries+" k="+k+" theta1="+theta1+" theta2="+theta2);
+
+ shared.rebuild();
+ assertEquals(shared.getRetainedEntries(false), k);
+ assertEquals(shared.getRetainedEntries(true), k);
+ sk1.rebuild();
+ assertEquals(shared.getRetainedEntries(false), k);
+ assertEquals(shared.getRetainedEntries(true), k);
+ }
+
+ @Test(expectedExceptions = SketchesArgumentException.class)
+ public void checkBadSerVer() {
+ int lgK = 9;
+ int k = 1 << lgK;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+ assertTrue(local.isEmpty());
+
+ for (int i = 0; i< k; i++) { local.update(i); }
+ waitForBgPropagationToComplete(shared);
+
+ assertFalse(local.isEmpty());
+ assertEquals(local.getEstimate(), k, 0.0);
+ assertEquals(shared.getRetainedEntries(false), k);
+
+ sl.wmem.putByte(SER_VER_BYTE, (byte) 0); //corrupt the SerVer byte
+ Sketch.wrap(sl.wmem);
+ }
+
+ @Test(expectedExceptions = SketchesArgumentException.class)
+ public void checkWrapIllegalFamilyID_wrap() {
+ int lgK = 9;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+
+ sl.wmem.putByte(FAMILY_BYTE, (byte) 0); //corrupt the Sketch ID byte
+ //try to wrap the corrupted mem
+ Sketch.wrap(sl.wmem); //catch in Sketch.constructDirectSketch
+ }
+
+ @Test(expectedExceptions = SketchesArgumentException.class)
+ public void checkWrapIllegalFamilyID_direct() {
+ int lgK = 9;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+
+ sl.wmem.putByte(FAMILY_BYTE, (byte) 0); //corrupt the Sketch ID byte
+ //try to wrap the corrupted mem
+ DirectQuickSelectSketch.writableWrap(sl.wmem, DEFAULT_UPDATE_SEED);
+ }
+
+ @Test(expectedExceptions = SketchesArgumentException.class)
+ public void checkHeapifySeedConflict() {
+ int lgK = 9;
+ long seed1 = 1021;
+ long seed2 = DEFAULT_UPDATE_SEED;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, seed1, useMem, true, 1);
+ UpdateSketch shared = sl.shared;
+
+ Memory srcMem = Memory.wrap(shared.toByteArray());
+ Sketch.heapify(srcMem, seed2);
+ }
+
+ @Test(expectedExceptions = SketchesArgumentException.class)
+ public void checkCorruptLgNomLongs() {
+ int lgK = 4;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+
+ sl.wmem.putByte(LG_NOM_LONGS_BYTE, (byte)2); //corrupt
+ Sketch.heapify(sl.wmem, DEFAULT_UPDATE_SEED);
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void checkIllegalHashUpdate() {
+ int lgK = 4;
+ boolean useMem = true;
+ SharedLocal sl = new SharedLocal(lgK, lgK, useMem);
+ UpdateSketch shared = sl.shared;
+ shared.hashUpdate(1);
}
@Test
@@ -787,39 +698,20 @@ public class ConcurrentDirectQuickSelectSketchTest {
//System.out.println(s); //disable here
}
- private static final int getMaxBytes(int k) {
- return (k << 4) + (Family.QUICKSELECT.getMinPreLongs() << 3);
+ private static void checkMemoryDirectProxyMethods(Sketch local, Sketch shared) {
+ assertEquals(local.hasMemory(), shared.hasMemory());
+ assertEquals(local.isDirect(), shared.isDirect());
}
- private static WritableDirectHandle makeNativeMemory(int k) {
- return WritableMemory.allocateDirect(getMaxBytes(k));
- }
-
- private UpdateSketch buildSharedReturnLocalSketch(WritableMemory mem) {
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(mem);
- return bldr.buildLocal(shared);
- }
-
- //configures builder for both local and shared
- private UpdateSketchBuilder configureBuilder() {
- final UpdateSketchBuilder bldr = new UpdateSketchBuilder();
- bldr.setLogNominalEntries(lgK);
- bldr.setLocalLogNominalEntries(lgK);
- bldr.setSeed(DEFAULT_UPDATE_SEED);
- return bldr;
- }
-
- private void waitForBgPropagationToComplete() {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- ((ConcurrentSharedThetaSketch)shared).awaitBgPropagationTermination();
- ConcurrentPropagationService.resetExecutorService(Thread.currentThread().getId());
- ((ConcurrentSharedThetaSketch)shared).initBgPropagationService();
+ //Does not check hasMemory(), isDirect()
+ private static void checkOtherProxyMethods(Sketch local, Sketch shared) {
+ assertEquals(local.getCompactBytes(), shared.getCompactBytes());
+ assertEquals(local.getCurrentBytes(), shared.getCurrentBytes());
+ assertEquals(local.getEstimate(), shared.getEstimate());
+ assertEquals(local.getLowerBound(2), shared.getLowerBound(2));
+ assertEquals(local.getUpperBound(2), shared.getUpperBound(2));
+ assertEquals(local.isEmpty(), shared.isEmpty());
+ assertEquals(local.isEstimationMode(), shared.isEstimationMode());
}
}
diff --git a/src/test/java/org/apache/datasketches/theta/ConcurrentHeapQuickSelectSketchTest.java b/src/test/java/org/apache/datasketches/theta/ConcurrentHeapQuickSelectSketchTest.java
index e119dc5..8e03aa3 100644
--- a/src/test/java/org/apache/datasketches/theta/ConcurrentHeapQuickSelectSketchTest.java
+++ b/src/test/java/org/apache/datasketches/theta/ConcurrentHeapQuickSelectSketchTest.java
@@ -32,10 +32,8 @@ import java.util.Arrays;
import org.apache.datasketches.Family;
import org.apache.datasketches.SketchesArgumentException;
-import org.apache.datasketches.SketchesStateException;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
/**
@@ -43,28 +41,23 @@ import org.testng.annotations.Test;
*/
@SuppressWarnings("javadoc")
public class ConcurrentHeapQuickSelectSketchTest {
- private int lgK;
- private long seed = DEFAULT_UPDATE_SEED;
- //private volatile ConcurrentSharedThetaSketch shared;
- private volatile UpdateSketch shared;
+
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkBadSerVer() {
- int k = 512;
- lgK = 9;
+ int lgK = 9;
+ int k = 1 << lgK;
int u = k;
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
assertTrue(local.isEmpty());
for (int i = 0; i< u; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
assertFalse(local.isEmpty());
assertEquals(local.getEstimate(), u, 0.0);
@@ -72,46 +65,42 @@ public class ConcurrentHeapQuickSelectSketchTest {
byte[] serArr = shared.toByteArray();
WritableMemory mem = WritableMemory.wrap(serArr);
- Sketch sk = Sketch.heapify(mem, seed);
+ Sketch sk = Sketch.heapify(mem, sl.seed);
assertTrue(sk instanceof HeapQuickSelectSketch); //Intentional promotion to Parent
mem.putByte(SER_VER_BYTE, (byte) 0); //corrupt the SerVer byte
- Sketch.heapify(mem, seed);
+ Sketch.heapify(mem, sl.seed);
}
@Test
public void checkPropagationNotOrdered() {
- int k = 256;
- lgK = 8;
+ int lgK = 8;
+ int k = 1 << lgK;
int u = 200*k;
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilderNotOrdered();
- assertFalse((1 << bldr.getLocalLgNominalEntries()) == 0);
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
-
+ SharedLocal sl = new SharedLocal(lgK, 4, false, false);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
+ assertEquals((sl.bldr.getLocalLgNominalEntries()), 4);
assertTrue(local.isEmpty());
for (int i = 0; i < u; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
assertFalse(local.isEmpty());
- assertTrue(shared.getRetainedEntries(false) <= u);
+ assertTrue(shared.getRetainedEntries(true) <= u);
}
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkIllegalSketchID_UpdateSketch() {
- int k = 512;
- lgK = 9;
+ int lgK = 9;
+ int k = 1 << lgK;
int u = k;
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
+ SharedLocal sl = new SharedLocal(lgK);
+
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
assertTrue(local.isEmpty());
assertTrue(shared instanceof ConcurrentHeapQuickSelectSketch);
for (int i = 0; i< u; i++) {
@@ -127,25 +116,25 @@ public class ConcurrentHeapQuickSelectSketchTest {
mem.putByte(FAMILY_BYTE, (byte) 0); //corrupt the Sketch ID byte
//try to heapify the corruped mem
- Sketch.heapify(mem, seed);
+ Sketch.heapify(mem, sl.seed);
}
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkHeapifySeedConflict() {
- lgK = 9;
- seed = 1021;
+ int lgK = 9;
+ long seed = 1021;
long seed2 = DEFAULT_UPDATE_SEED;
- buildSharedReturnLocalSketch();
- byte[] byteArray = shared.toByteArray();
+ SharedLocal sl = new SharedLocal(lgK, lgK, seed);
+ byte[] byteArray = sl.shared.toByteArray();
Memory srcMem = Memory.wrap(byteArray);
Sketch.heapify(srcMem, seed2);
}
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkHeapifyCorruptLgNomLongs() {
- lgK = 4;
- buildSharedReturnLocalSketch();
- byte[] serArr = shared.toByteArray();
+ int lgK = 4;
+ SharedLocal sl = new SharedLocal(lgK);
+ byte[] serArr = sl.shared.toByteArray();
WritableMemory srcMem = WritableMemory.wrap(serArr);
srcMem.putByte(LG_NOM_LONGS_BYTE, (byte)2); //corrupt
Sketch.heapify(srcMem, DEFAULT_UPDATE_SEED);
@@ -153,24 +142,24 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test(expectedExceptions = UnsupportedOperationException.class)
public void checkIllegalHashUpdate() {
- lgK = 4;
- buildSharedReturnLocalSketch();
- shared.hashUpdate(1);
+ int lgK = 4;
+ SharedLocal sl = new SharedLocal(lgK);
+ sl.shared.hashUpdate(1);
}
@Test
public void checkHeapifyByteArrayExact() {
- int k = 512;
- lgK = 9;
+ int lgK = 9;
+ int k = 1 << lgK;
int u = k;
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
- UpdateSketch local = buildSharedReturnLocalSketch();
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
for (int i=0; i<u; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
byte[] serArr = shared.toByteArray();
Memory srcMem = Memory.wrap(serArr);
@@ -179,8 +168,8 @@ public class ConcurrentHeapQuickSelectSketchTest {
//reconstruct to Native/Direct
final int bytes = Sketch.getMaxUpdateSketchBytes(k);
final WritableMemory wmem = WritableMemory.allocate(bytes);
- shared = bldr.buildSharedFromSketch((UpdateSketch)recoveredShared, wmem);
- UpdateSketch local2 = bldr.buildLocal(shared);
+ shared = sl.bldr.buildSharedFromSketch((UpdateSketch)recoveredShared, wmem);
+ UpdateSketch local2 = sl.bldr.buildLocal(shared);
assertEquals(local2.getEstimate(), u, 0.0);
assertEquals(local2.getLowerBound(2), u, 0.0);
@@ -193,18 +182,18 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test
public void checkHeapifyByteArrayEstimating() {
- int k = 4096;
- lgK = 12;
+ int lgK = 12;
+ int k = 1 << lgK;
int u = 2*k;
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
- UpdateSketch local = buildSharedReturnLocalSketch();
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch local = sl.local;
+ UpdateSketch shared = sl.shared;
for (int i=0; i<u; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
double localEst = local.getEstimate();
double localLB = local.getLowerBound(2);
@@ -213,12 +202,12 @@ public class ConcurrentHeapQuickSelectSketchTest {
byte[] serArr = shared.toByteArray();
Memory srcMem = Memory.wrap(serArr);
- UpdateSketch recoveredShared = UpdateSketch.heapify(srcMem, seed);
+ UpdateSketch recoveredShared = UpdateSketch.heapify(srcMem, sl.seed);
final int bytes = Sketch.getMaxUpdateSketchBytes(k);
final WritableMemory wmem = WritableMemory.allocate(bytes);
- shared = bldr.buildSharedFromSketch(recoveredShared, wmem);
- UpdateSketch local2 = bldr.buildLocal(shared);
+ shared = sl.bldr.buildSharedFromSketch(recoveredShared, wmem);
+ UpdateSketch local2 = sl.bldr.buildLocal(shared);
assertEquals(local2.getEstimate(), localEst);
assertEquals(local2.getLowerBound(2), localLB);
assertEquals(local2.getUpperBound(2), localUB);
@@ -229,18 +218,19 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test
public void checkHeapifyMemoryEstimating() {
- int k = 512;
- lgK = 9;
+ int lgK = 9;
+ int k = 1 << lgK;
int u = 2*k;
- seed = DEFAULT_UPDATE_SEED;
boolean estimating = (u > k);
- final UpdateSketchBuilder bldr = configureBuilder();
- UpdateSketch local = buildSharedReturnLocalSketch();
+
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch local = sl.local;
+ UpdateSketch shared = sl.shared;
for (int i=0; i<u; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
double localEst = local.getEstimate();
double localLB = local.getLowerBound(2);
@@ -256,8 +246,8 @@ public class ConcurrentHeapQuickSelectSketchTest {
final int bytes = Sketch.getMaxUpdateSketchBytes(k);
final WritableMemory wmem = WritableMemory.allocate(bytes);
- shared = bldr.buildSharedFromSketch(recoveredShared, wmem);
- UpdateSketch local2 = bldr.buildLocal(shared);
+ shared = sl.bldr.buildSharedFromSketch(recoveredShared, wmem);
+ UpdateSketch local2 = sl.bldr.buildLocal(shared);
assertEquals(local2.getEstimate(), localEst);
assertEquals(local2.getLowerBound(2), localLB);
@@ -268,18 +258,16 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test
public void checkHQStoCompactForms() {
- int k = 512;
- lgK = 9;
+ int lgK = 9;
+ int k = 1 << lgK;
int u = 4*k;
boolean estimating = (u > k);
int maxBytes = (k << 4) + (Family.QUICKSELECT.getMinPreLongs() << 3);
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
assertEquals(local.getClass().getSimpleName(), "ConcurrentHeapThetaBuffer");
assertFalse(local.isDirect());
@@ -288,7 +276,7 @@ public class ConcurrentHeapQuickSelectSketchTest {
for (int i=0; i<u; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
shared.rebuild(); //forces size back to k
@@ -296,9 +284,9 @@ public class ConcurrentHeapQuickSelectSketchTest {
double localEst = local.getEstimate();
double localLB = local.getLowerBound(2);
double localUB = local.getUpperBound(2);
- int localBytes = local.getCurrentBytes(); //size stored as UpdateSketch
- int localCompBytes = local.getCompactBytes(); //size stored as CompactSketch
- assertEquals(localBytes, maxBytes);
+ int sharedBytes = shared.getCurrentBytes();
+ int sharedCompBytes = shared.getCompactBytes();
+ assertEquals(sharedBytes, maxBytes);
assertEquals(local.isEstimationMode(), estimating);
CompactSketch comp1, comp2, comp3, comp4;
@@ -310,7 +298,7 @@ public class ConcurrentHeapQuickSelectSketchTest {
assertEquals(comp1.getUpperBound(2), localUB);
assertEquals(comp1.isEmpty(), false);
assertEquals(comp1.isEstimationMode(), estimating);
- assertEquals(comp1.getCompactBytes(), localCompBytes);
+ assertEquals(comp1.getCompactBytes(), sharedCompBytes);
assertEquals(comp1.getClass().getSimpleName(), "HeapCompactSketch");
comp2 = shared.compact(true, null);
@@ -320,10 +308,10 @@ public class ConcurrentHeapQuickSelectSketchTest {
assertEquals(comp2.getUpperBound(2), localUB);
assertEquals(comp2.isEmpty(), false);
assertEquals(comp2.isEstimationMode(), estimating);
- assertEquals(comp2.getCompactBytes(), localCompBytes);
+ assertEquals(comp2.getCompactBytes(), sharedCompBytes);
assertEquals(comp2.getClass().getSimpleName(), "HeapCompactSketch");
- byte[] memArr2 = new byte[localCompBytes];
+ byte[] memArr2 = new byte[sharedCompBytes];
WritableMemory mem2 = WritableMemory.wrap(memArr2); //allocate mem for compact form
comp3 = shared.compact(false, mem2); //load the mem2
@@ -333,7 +321,7 @@ public class ConcurrentHeapQuickSelectSketchTest {
assertEquals(comp3.getUpperBound(2), localUB);
assertEquals(comp3.isEmpty(), false);
assertEquals(comp3.isEstimationMode(), estimating);
- assertEquals(comp3.getCompactBytes(), localCompBytes);
+ assertEquals(comp3.getCompactBytes(), sharedCompBytes);
assertEquals(comp3.getClass().getSimpleName(), "DirectCompactSketch");
mem2.clear();
@@ -344,19 +332,17 @@ public class ConcurrentHeapQuickSelectSketchTest {
assertEquals(comp4.getUpperBound(2), localUB);
assertEquals(comp4.isEmpty(), false);
assertEquals(comp4.isEstimationMode(), estimating);
- assertEquals(comp4.getCompactBytes(), localCompBytes);
+ assertEquals(comp4.getCompactBytes(), sharedCompBytes);
assertEquals(comp4.getClass().getSimpleName(), "DirectCompactSketch");
comp4.toString(false, true, 0, false);
}
@Test
public void checkHQStoCompactEmptyForms() {
- lgK = 9;
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
+ int lgK = 9;
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
println("lgArr: "+ local.getLgArrLongs());
//empty
@@ -396,20 +382,18 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test
public void checkExactMode() {
- lgK = 12;
- int u = 4096;
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
+ int lgK = 12;
+ int u = 1 << lgK;
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
assertTrue(local.isEmpty());
for (int i = 0; i< u; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
assertEquals(local.getEstimate(), u, 0.0);
assertEquals(shared.getRetainedEntries(false), u);
@@ -417,13 +401,11 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test
public void checkEstMode() {
- int k = 4096;
- lgK = 12;
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
+ int lgK = 12;
+ int k = 1 << lgK;
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
assertTrue(local.isEmpty());
@@ -431,24 +413,23 @@ public class ConcurrentHeapQuickSelectSketchTest {
for (int i = 0; i< u; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
final int retained = shared.getRetainedEntries(false);
- //final int retained = ((UpdateSketch) shared).getRetainedEntries(false);
assertTrue(retained > k);
- // in general it might be exactly k, but in this case must be greater
+ // it could be exactly k, but in this case must be greater
}
@Test
public void checkErrorBounds() {
- int k = 512;
- lgK = 12;
-
- seed = DEFAULT_UPDATE_SEED;
- UpdateSketch local = buildSharedReturnLocalSketch();
+ int lgK = 9;
+ int k = 1 << lgK;
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch local = sl.local;
+ UpdateSketch shared = sl.shared;
//Exact mode
- int limit = (int)ConcurrentSharedThetaSketch.computeExactLimit(k, 0);
- for (int i = 0; i < limit; i++ ) {
+ //int limit = (int)ConcurrentSharedThetaSketch.computeExactLimit(lim, 0); //? ask Eshcar
+ for (int i = 0; i < k; i++ ) {
local.update(i);
}
@@ -459,12 +440,12 @@ public class ConcurrentHeapQuickSelectSketchTest {
assertEquals(est, lb, 0.0);
//Est mode
- int u = 10*k;
- for (int i = limit; i < u; i++ ) {
+ int u = 2 * k;
+ for (int i = k; i < u; i++ ) {
local.update(i);
local.update(i); //test duplicate rejection
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
est = local.getEstimate();
lb = local.getLowerBound(2);
ub = local.getUpperBound(2);
@@ -474,14 +455,12 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test
public void checkRebuild() {
- int k = 16;
- lgK = 4;
-
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilder();
+ int lgK = 4;
+ int k = 1 << lgK;
+ SharedLocal sl = new SharedLocal(lgK);
//must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
assertTrue(local.isEmpty());
int t = ((ConcurrentHeapThetaBuffer)local).getHashTableThreshold();
@@ -489,7 +468,7 @@ public class ConcurrentHeapQuickSelectSketchTest {
for (int i = 0; i< t; i++) {
local.update(i);
}
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
assertFalse(local.isEmpty());
assertTrue(local.getEstimate() > 0.0);
@@ -503,55 +482,43 @@ public class ConcurrentHeapQuickSelectSketchTest {
assertEquals(shared.getRetainedEntries(true), k);
}
- @Test(expectedExceptions = SketchesStateException.class)
+ @Test
public void checkBuilder() {
- lgK = 4;
-
- seed = DEFAULT_UPDATE_SEED;
- final UpdateSketchBuilder bldr = configureBuilderWithNominal();
- assertEquals(bldr.getLocalLgNominalEntries(), lgK);
- assertEquals(bldr.getLgNominalEntries(), lgK);
- println(bldr.toString());
- bldr.buildLocal(shared);
- }
-
- @Test(expectedExceptions = SketchesArgumentException.class)
- public void checkBuilderSmallLgNominal() {
- lgK = 1;
- seed = DEFAULT_UPDATE_SEED;
- configureBuilder();
+ int lgK = 4;
+ SharedLocal sl = new SharedLocal(lgK);
+ assertEquals(sl.bldr.getLocalLgNominalEntries(), lgK);
+ assertEquals(sl.bldr.getLgNominalEntries(), lgK);
+ println(sl.bldr.toString());
}
+ @SuppressWarnings("unused")
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkBuilderSmallNominal() {
- lgK = 2;
- seed = DEFAULT_UPDATE_SEED;
- configureBuilderWithNominal();
+ int lgK = 2; //too small
+ new SharedLocal(lgK);
}
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkNegativeHashes() {
- lgK = 9;
- UpdateSketch qs = buildSharedReturnLocalSketch();
- qs.hashUpdate(-1L);
+ int lgK = 9;
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch local = sl.local;
+ local.hashUpdate(-1L);
}
@Test
public void checkResetAndStartingSubMultiple() {
- int k = 512;
- lgK = 9;
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared();
- UpdateSketch local = bldr.buildLocal(shared);
- //ConcurrentHeapThetaBuffer sk1 = (ConcurrentHeapThetaBuffer)usk; //for internal checks
+ int lgK = 9;
+ int k = 1 << lgK;
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch shared = sl.shared;
+ UpdateSketch local = sl.local;
assertTrue(local.isEmpty());
int u = 3*k;
for (int i = 0; i< u; i++) { local.update(i); }
- waitForBgPropagationToComplete();
+ waitForBgPropagationToComplete(shared);
assertFalse(local.isEmpty());
assertTrue(shared.getRetainedEntries(false) >= k);
@@ -567,12 +534,10 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test
public void checkDQStoCompactEmptyForms() {
- lgK = 9;
-
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
+ int lgK = 9;
+ SharedLocal sl = new SharedLocal(lgK);
+ UpdateSketch local = sl.local;
+ UpdateSketch shared = sl.shared;
//empty
local.toString(false, true, 0, false); //exercise toString
@@ -582,7 +547,7 @@ public class ConcurrentHeapQuickSelectSketchTest {
double uskUB = local.getUpperBound(2);
assertFalse(local.isEstimationMode());
- int bytes = local.getCompactBytes(); //compact form
+ int bytes = local.getCompactBytes();
assertEquals(bytes, 8);
byte[] memArr2 = new byte[bytes];
WritableMemory mem2 = WritableMemory.wrap(memArr2);
@@ -608,75 +573,68 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkMinReqBytes() {
- int k = 16;
- lgK = 4;
- UpdateSketch local = buildSharedReturnLocalSketch();
- for (int i = 0; i < (4 * k); i++) { local.update(i); }
- waitForBgPropagationToComplete();
- byte[] byteArray = shared.toByteArray();
- byte[] badBytes = Arrays.copyOfRange(byteArray, 0, 24);
+ int lgK = 4;
+ int k = 1 << lgK;
+ SharedLocal sl = new SharedLocal(lgK);
+ for (int i = 0; i < (4 * k); i++) { sl.local.update(i); }
+ waitForBgPropagationToComplete(sl.shared);
+ byte[] byteArray = sl.shared.toByteArray();
+ byte[] badBytes = Arrays.copyOfRange(byteArray, 0, 24); //corrupt no. bytes
Memory mem = Memory.wrap(badBytes);
Sketch.heapify(mem);
}
@Test(expectedExceptions = SketchesArgumentException.class)
public void checkThetaAndLgArrLongs() {
- int k = 16;
- lgK = 4;
- UpdateSketch local = buildSharedReturnLocalSketch();
- for (int i = 0; i < k; i++) { local.update(i); }
- waitForBgPropagationToComplete();
- byte[] badArray = shared.toByteArray();
+ int lgK = 4;
+ int k = 1 << lgK;
+ SharedLocal sl = new SharedLocal(lgK);
+ for (int i = 0; i < k; i++) { sl.local.update(i); }
+ waitForBgPropagationToComplete(sl.shared);
+ byte[] badArray = sl.shared.toByteArray();
WritableMemory mem = WritableMemory.wrap(badArray);
- PreambleUtil.insertLgArrLongs(mem, 4);
- PreambleUtil.insertThetaLong(mem, Long.MAX_VALUE / 2);
+ PreambleUtil.insertLgArrLongs(mem, 4); //corrupt
+ PreambleUtil.insertThetaLong(mem, Long.MAX_VALUE / 2); //corrupt
Sketch.heapify(mem);
}
@Test
public void checkFamily() {
- UpdateSketch local = buildSharedReturnLocalSketch();
+ SharedLocal sl = new SharedLocal();
+ UpdateSketch local = sl.local;
assertEquals(local.getFamily(), Family.QUICKSELECT);
}
@Test
public void checkBackgroundPropagation() {
- int k = 16;
- lgK = 4;
+ int lgK = 4;
+ int k = 1 << lgK;
int u = 5*k;
- final UpdateSketchBuilder bldr = configureBuilderWithCache();
- //must build shared first
- shared = bldr.buildShared(null);
- UpdateSketch local = bldr.buildLocal(shared);
-
-
- assertTrue(local.isEmpty());
+ SharedLocal sl = new SharedLocal(lgK);
+ assertTrue(sl.local.isEmpty());
int i = 0;
- for (; i< k; i++) {
- local.update(i);
- }
- waitForBgPropagationToComplete();
- assertFalse(local.isEmpty());
- assertTrue(local.getEstimate() > 0.0);
- long theta1 = ((ConcurrentHeapQuickSelectSketch)shared).getVolatileTheta();
+ for (; i < k; i++) { sl.local.update(i); } //exact
+ waitForBgPropagationToComplete(sl.shared);
- for (; i< u; i++) {
- local.update(i);
- }
- waitForBgPropagationToComplete();
+ assertFalse(sl.local.isEmpty());
+ assertTrue(sl.local.getEstimate() > 0.0);
+ long theta1 = sl.sharedIf.getVolatileTheta();
- long theta2 = ((ConcurrentHeapQuickSelectSketch)shared).getVolatileTheta();
- int entries = shared.getRetainedEntries(false);
+ for (; i < u; i++) { sl.local.update(i); } //continue, make it estimating
+ waitForBgPropagationToComplete(sl.shared);
+
+ long theta2 = sl.sharedIf.getVolatileTheta();
+ int entries = sl.shared.getRetainedEntries(false);
assertTrue((entries > k) || (theta2 < theta1),
"entries= " + entries + " k= " + k + " theta1= " + theta1 + " theta2= " + theta2);
- shared.rebuild();
- assertEquals(shared.getRetainedEntries(false), k);
- assertEquals(shared.getRetainedEntries(true), k);
- local.rebuild();
- assertEquals(shared.getRetainedEntries(false), k);
- assertEquals(shared.getRetainedEntries(true), k);
+ sl.shared.rebuild();
+ assertEquals(sl.shared.getRetainedEntries(false), k);
+ assertEquals(sl.shared.getRetainedEntries(true), k);
+ sl.local.rebuild();
+ assertEquals(sl.shared.getRetainedEntries(false), k);
+ assertEquals(sl.shared.getRetainedEntries(true), k);
}
@Test
@@ -704,10 +662,8 @@ public class ConcurrentHeapQuickSelectSketchTest {
@Test(expectedExceptions = UnsupportedOperationException.class)
public void checkToByteArray() {
- UpdateSketchBuilder bldr = new UpdateSketchBuilder();
- UpdateSketch shared = bldr.buildShared();
- UpdateSketch local = bldr.buildLocal(shared);
- local.toByteArray();
+ SharedLocal sl = new SharedLocal();
+ sl.local.toByteArray();
}
@Test
@@ -715,11 +671,6 @@ public class ConcurrentHeapQuickSelectSketchTest {
println("PRINTING: "+this.getClass().getName());
}
- @AfterMethod
- public void clearShared() {
- shared = null;
- }
-
/**
* @param s value to print
*/
@@ -727,58 +678,71 @@ public class ConcurrentHeapQuickSelectSketchTest {
//System.out.println(s); //disable here
}
- private UpdateSketch buildSharedReturnLocalSketch() {
- final UpdateSketchBuilder bldr = configureBuilder();
- //must build shared first
- shared = bldr.buildShared(null);
- return bldr.buildLocal(shared);
- }
-
- //configures builder for both local and shared
- private UpdateSketchBuilder configureBuilder() {
+ static class SharedLocal {
+ static final long DefaultSeed = DEFAULT_UPDATE_SEED;
+ final UpdateSketch shared;
+ final ConcurrentSharedThetaSketch sharedIf;
+ final UpdateSketch local;
+ final int sharedLgK;
+ final int localLgK;
+ final long seed;
+ final WritableMemory wmem;
final UpdateSketchBuilder bldr = new UpdateSketchBuilder();
- bldr.setLogNominalEntries(lgK);
- bldr.setLocalLogNominalEntries(lgK);
- bldr.setSeed(seed);
- return bldr;
- }
- //configures builder for both local and shared
- private UpdateSketchBuilder configureBuilderNotOrdered() {
- final UpdateSketchBuilder bldr = new UpdateSketchBuilder();
- bldr.setLogNominalEntries(lgK);
- bldr.setLocalLogNominalEntries(4);
- bldr.setSeed(seed);
- bldr.setPropagateOrderedCompact(false);
- return bldr;
- }
+ SharedLocal() {
+ this(9, 9, DefaultSeed, false, true, 1);
+ }
- //configures builder for both local and shared
- private UpdateSketchBuilder configureBuilderWithNominal() {
- final UpdateSketchBuilder bldr = configureBuilder();
- int k = 1 << lgK;
- bldr.setLocalNominalEntries(k);
- bldr.setNominalEntries(k);
- assertTrue(bldr.getPropagateOrderedCompact());
- assertEquals(bldr.getSeed(), DEFAULT_UPDATE_SEED);
- return bldr;
- }
+ SharedLocal(int lgK) {
+ this(lgK, lgK, DefaultSeed, false, true, 1);
+ }
- //configures builder for both local and shared
- private UpdateSketchBuilder configureBuilderWithCache() {
- final UpdateSketchBuilder bldr = configureBuilder();
- return bldr;
+ SharedLocal(int sharedLgK, int localLgK) {
+ this(sharedLgK, localLgK, DefaultSeed, false, true, 1);
+ }
+
+ SharedLocal(int sharedLgK, int localLgK, long seed) {
+ this(sharedLgK, localLgK, seed, false, true, 1);
+ }
+
+ SharedLocal(int sharedLgK, int localLgK, boolean useMem) {
+ this(sharedLgK, localLgK, DefaultSeed, useMem, true, 1);
+ }
+
+ SharedLocal(int sharedLgK, int localLgK, boolean useMem, boolean ordered) {
+ this(sharedLgK, localLgK, DefaultSeed, useMem, ordered, 1);
+ }
+
+ SharedLocal(int sharedLgK, int localLgK, long seed, boolean useMem, boolean ordered, int memMult) {
+ this.sharedLgK = sharedLgK;
+ this.localLgK = localLgK;
+ this.seed = seed;
+ if (useMem) {
+ int bytes = (((4 << sharedLgK) * memMult) + (Family.QUICKSELECT.getMaxPreLongs())) << 3;
+ wmem = WritableMemory.allocate(bytes);
+ } else {
+ wmem = null;
+ }
+ bldr.setLogNominalEntries(sharedLgK);
+ bldr.setLocalLogNominalEntries(localLgK);
+ bldr.setPropagateOrderedCompact(ordered);
+ bldr.setSeed(this.seed);
+ shared = bldr.buildShared(wmem);
+ local = bldr.buildLocal(shared);
+ sharedIf = (ConcurrentSharedThetaSketch) shared;
+ }
}
- private void waitForBgPropagationToComplete() {
+ static void waitForBgPropagationToComplete(UpdateSketch shared) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
- ((ConcurrentHeapQuickSelectSketch)shared).awaitBgPropagationTermination();
+ ConcurrentSharedThetaSketch csts = (ConcurrentSharedThetaSketch)shared;
+ csts.awaitBgPropagationTermination();
ConcurrentPropagationService.resetExecutorService(Thread.currentThread().getId());
- ((ConcurrentHeapQuickSelectSketch)shared).initBgPropagationService();
+ csts.initBgPropagationService();
}
}
diff --git a/src/test/java/org/apache/datasketches/theta/DirectQuickSelectSketchTest.java b/src/test/java/org/apache/datasketches/theta/DirectQuickSelectSketchTest.java
index 0eaf1e4..26e7b4c 100644
--- a/src/test/java/org/apache/datasketches/theta/DirectQuickSelectSketchTest.java
+++ b/src/test/java/org/apache/datasketches/theta/DirectQuickSelectSketchTest.java
@@ -133,7 +133,7 @@ public class DirectQuickSelectSketchTest {
assertTrue(sk1.hasMemory());
assertFalse(sk1.isDirty());
assertTrue(sk1.hasMemory());
- assertEquals(sk1.getCurrentPreambleLongs(false), 3);
+ assertEquals(sk1.getCurrentPreambleLongs(), 3);
UpdateSketch sk2 = Sketches.heapifyUpdateSketch(mem);
assertEquals(sk2.getEstimate(), sk1est);
diff --git a/src/test/java/org/apache/datasketches/theta/DirectUnionTest.java b/src/test/java/org/apache/datasketches/theta/DirectUnionTest.java
index 0975b40..eb372a0 100644
--- a/src/test/java/org/apache/datasketches/theta/DirectUnionTest.java
+++ b/src/test/java/org/apache/datasketches/theta/DirectUnionTest.java
@@ -780,7 +780,7 @@ public class DirectUnionTest {
CompactSketch csk = union1.getResult();
assertTrue(csk.getTheta() < 0.2);
- assertEquals(csk.getRetainedEntries(), 16384);
+ assertEquals(csk.getRetainedEntries(true), 16384);
final double est = csk.getEstimate();
assertTrue(est > 98663.0);
assertTrue(est < 101530.0);
@@ -806,7 +806,7 @@ public class DirectUnionTest {
CompactSketch csk = union1.getResult();
assertTrue(csk.getTheta() < 0.2);
- assertEquals(csk.getRetainedEntries(), 16384);
+ assertEquals(csk.getRetainedEntries(true), 16384);
final double est = csk.getEstimate();
assertTrue(est > 98663.0);
assertTrue(est < 101530.0);
diff --git a/src/test/java/org/apache/datasketches/theta/SetOpsCornerCasesTest.java b/src/test/java/org/apache/datasketches/theta/SetOpsCornerCasesTest.java
index d29a84d..73180b0 100644
--- a/src/test/java/org/apache/datasketches/theta/SetOpsCornerCasesTest.java
+++ b/src/test/java/org/apache/datasketches/theta/SetOpsCornerCasesTest.java
@@ -329,15 +329,15 @@ public class SetOpsCornerCasesTest {
CompactSketch skHeap2 = generate(EST_HEAP, k);
CompactSketch csk;
csk = PairwiseSetOperations.union(skNull, skHeap1, k);
- Assert.assertEquals(csk.getRetainedEntries(), k);
+ Assert.assertEquals(csk.getRetainedEntries(true), k);
csk = PairwiseSetOperations.union(skEmpty, skHeap1, k);
- Assert.assertEquals(csk.getRetainedEntries(), k);
+ Assert.assertEquals(csk.getRetainedEntries(true), k);
csk = PairwiseSetOperations.union(skHeap1, skNull, k);
- Assert.assertEquals(csk.getRetainedEntries(), k);
+ Assert.assertEquals(csk.getRetainedEntries(true), k);
csk = PairwiseSetOperations.union(skHeap1, skEmpty, k);
- Assert.assertEquals(csk.getRetainedEntries(), k);
+ Assert.assertEquals(csk.getRetainedEntries(true), k);
csk = PairwiseSetOperations.union(skHeap1, skHeap2, k);
- Assert.assertEquals(csk.getRetainedEntries(), k);
+ Assert.assertEquals(csk.getRetainedEntries(true), k);
}
@@ -366,7 +366,7 @@ public class SetOpsCornerCasesTest {
csk = generate(State.EMPTY, k);
assertEquals(csk.isEmpty(), true);
assertEquals(csk.isEstimationMode(), false);
- assertEquals(csk.getRetainedEntries(), 0);
+ assertEquals(csk.getRetainedEntries(true), 0);
assertEquals(csk.getThetaLong(), Long.MAX_VALUE);
assertEquals(csk.isDirect(), false);
assertEquals(csk.hasMemory(), false);
@@ -375,7 +375,7 @@ public class SetOpsCornerCasesTest {
csk = generate(State.SINGLE, k);
assertEquals(csk.isEmpty(), false);
assertEquals(csk.isEstimationMode(), false);
- assertEquals(csk.getRetainedEntries(), 1);
+ assertEquals(csk.getRetainedEntries(true), 1);
assertEquals(csk.getThetaLong(), Long.MAX_VALUE);
assertEquals(csk.isDirect(), false);
assertEquals(csk.hasMemory(), false);
@@ -384,7 +384,7 @@ public class SetOpsCornerCasesTest {
csk = generate(State.EXACT, k);
assertEquals(csk.isEmpty(), false);
assertEquals(csk.isEstimationMode(), false);
- assertEquals(csk.getRetainedEntries(), k);
+ assertEquals(csk.getRetainedEntries(true), k);
assertEquals(csk.getThetaLong(), Long.MAX_VALUE);
assertEquals(csk.isDirect(), false);
assertEquals(csk.hasMemory(), false);
@@ -393,7 +393,7 @@ public class SetOpsCornerCasesTest {
csk = generate(State.EST_HEAP, k);
assertEquals(csk.isEmpty(), false);
assertEquals(csk.isEstimationMode(), true);
- assertEquals(csk.getRetainedEntries() > k, true);
+ assertEquals(csk.getRetainedEntries(true) > k, true);
assertEquals(csk.getThetaLong() < Long.MAX_VALUE, true);
assertEquals(csk.isDirect(), false);
assertEquals(csk.hasMemory(), false);
@@ -402,7 +402,7 @@ public class SetOpsCornerCasesTest {
csk = generate(State.THLT1_CNT0_FALSE, k);
assertEquals(csk.isEmpty(), false);
assertEquals(csk.isEstimationMode(), true);
- assertEquals(csk.getRetainedEntries(), 0);
+ assertEquals(csk.getRetainedEntries(true), 0);
assertEquals(csk.getThetaLong() < Long.MAX_VALUE, true);
assertEquals(csk.isDirect(), false);
assertEquals(csk.hasMemory(), false);
@@ -411,7 +411,7 @@ public class SetOpsCornerCasesTest {
csk = generate(State.THEQ1_CNT0_TRUE, k);
assertEquals(csk.isEmpty(), true);
assertEquals(csk.isEstimationMode(), false);
- assertEquals(csk.getRetainedEntries(), 0);
+ assertEquals(csk.getRetainedEntries(true), 0);
assertEquals(csk.getThetaLong() < Long.MAX_VALUE, false);
assertEquals(csk.isDirect(), false);
assertEquals(csk.hasMemory(), false);
@@ -420,7 +420,7 @@ public class SetOpsCornerCasesTest {
csk = generate(State.EST_MEMORY_UNORDERED, k);
assertEquals(csk.isEmpty(), false);
assertEquals(csk.isEstimationMode(), true);
- assertEquals(csk.getRetainedEntries() > k, true);
+ assertEquals(csk.getRetainedEntries(true) > k, true);
assertEquals(csk.getThetaLong() < Long.MAX_VALUE, true);
assertEquals(csk.isDirect(), false);
assertEquals(csk.hasMemory(), true);
@@ -467,13 +467,13 @@ public class SetOpsCornerCasesTest {
case THLT1_CNT0_FALSE : {
sk = Sketches.updateSketchBuilder().setP((float)0.5).setNominalEntries(k).build();
sk.update(7); //above theta
- assert(sk.getRetainedEntries() == 0);
+ assert(sk.getRetainedEntries(true) == 0);
csk = sk.compact(true, null); //compact as {Th < 1.0, 0, F}
break;
}
case THEQ1_CNT0_TRUE : {
sk = Sketches.updateSketchBuilder().setP((float)0.5).setNominalEntries(k).build();
- assert(sk.getRetainedEntries() == 0);
+ assert(sk.getRetainedEntries(true) == 0);
csk = sk.compact(true, null); //compact as {Th < 1.0, 0, T}
break;
}
diff --git a/src/test/java/org/apache/datasketches/theta/SingleItemSketchTest.java b/src/test/java/org/apache/datasketches/theta/SingleItemSketchTest.java
index d2fa4db..01226bf 100644
--- a/src/test/java/org/apache/datasketches/theta/SingleItemSketchTest.java
+++ b/src/test/java/org/apache/datasketches/theta/SingleItemSketchTest.java
@@ -167,7 +167,7 @@ public class SingleItemSketchTest {
public void checkRestricted() {
SingleItemSketch sis = SingleItemSketch.create(1);
assertNull(sis.getMemory());
- assertEquals(sis.getCurrentPreambleLongs(true), 1);
+ assertEquals(sis.getCompactPreambleLongs(), 1);
}
@Test
diff --git a/src/test/java/org/apache/datasketches/theta/SketchTest.java b/src/test/java/org/apache/datasketches/theta/SketchTest.java
index fa11112..1193e38 100644
--- a/src/test/java/org/apache/datasketches/theta/SketchTest.java
+++ b/src/test/java/org/apache/datasketches/theta/SketchTest.java
@@ -65,8 +65,8 @@ public class SketchTest {
int lowQSPreLongs = Family.QUICKSELECT.getMinPreLongs();
int lowCompPreLongs = Family.COMPACT.getMinPreLongs();
UpdateSketch sketch = UpdateSketch.builder().setNominalEntries(k).build(); // QS Sketch
- assertEquals(sketch.getCurrentPreambleLongs(false), lowQSPreLongs);
- assertEquals(sketch.getCurrentPreambleLongs(true), 1); //compact form
+ assertEquals(sketch.getCurrentPreambleLongs(), lowQSPreLongs);
+ assertEquals(sketch.getCompactPreambleLongs(), 1); //compact form
assertEquals(sketch.getCurrentDataLongs(), k*2);
assertEquals(sketch.getCurrentBytes(), (k*2*8) + (lowQSPreLongs << 3));
assertEquals(sketch.getCompactBytes(), lowCompPreLongs << 3);
@@ -84,8 +84,8 @@ public class SketchTest {
sketch.update(i);
}
- assertEquals(sketch.getCurrentPreambleLongs(false), lowQSPreLongs);
- assertEquals(sketch.getCurrentPreambleLongs(true), 2); //compact form
+ assertEquals(sketch.getCurrentPreambleLongs(), lowQSPreLongs);
+ assertEquals(sketch.getCompactPreambleLongs(), 2); //compact form
assertEquals(sketch.getCurrentDataLongs(), k*2);
assertEquals(sketch.getCurrentBytes(), (k*2*8) + (lowQSPreLongs << 3));
assertEquals(sketch.getCompactBytes(), (k*8) + (2*8)); //compact form //FAILS HERE
@@ -99,8 +99,8 @@ public class SketchTest {
}
int curCount = sketch.getRetainedEntries(true);
- assertEquals(sketch.getCurrentPreambleLongs(false), lowQSPreLongs);
- assertEquals(sketch.getCurrentPreambleLongs(true), 3); //compact form
+ assertEquals(sketch.getCurrentPreambleLongs(), lowQSPreLongs);
+ assertEquals(sketch.getCompactPreambleLongs(), 3); //compact form
assertEquals(sketch.getCurrentDataLongs(), k*2);
assertEquals(sketch.getCurrentBytes(), (k*2*8) + (lowQSPreLongs << 3));
assertEquals(sketch.getCompactBytes(), (curCount*8) + (3*8)); //compact form
diff --git a/src/test/java/org/apache/datasketches/theta/UnionImplTest.java b/src/test/java/org/apache/datasketches/theta/UnionImplTest.java
index c7d2b50..9ae9740 100644
--- a/src/test/java/org/apache/datasketches/theta/UnionImplTest.java
+++ b/src/test/java/org/apache/datasketches/theta/UnionImplTest.java
@@ -197,7 +197,7 @@ public class UnionImplTest {
for (int i = 0; i < k; i++) { sk.update(i); }
double est1 = sk.getEstimate();
- int bytes = Sketches.getMaxCompactSketchBytes(sk.getRetainedEntries());
+ int bytes = Sketches.getMaxCompactSketchBytes(sk.getRetainedEntries(true));
try (WritableDirectHandle h = WritableMemory.allocateDirect(bytes)) {
WritableMemory wmem = h.get();
CompactSketch csk = sk.compact(true, wmem); //ordered, direct
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org