You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/01/24 23:13:51 UTC
[beam] 03/04: Make Sketch AutoValue + Javadoc update
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit cc9182a6b257726b13b69073aa321da1ad4d6c37
Author: ArnaudFnr <ar...@gmail.com>
AuthorDate: Fri Jan 12 01:41:34 2018 +0100
Make Sketch AutoValue + Javadoc update
---
.../extensions/sketching/ApproximateDistinct.java | 9 +-
.../extensions/sketching/SketchFrequencies.java | 126 ++++++++++-----------
.../sketching/SketchFrequenciesTest.java | 14 +--
3 files changed, 70 insertions(+), 79 deletions(-)
diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
index 80446a0..3fea951 100644
--- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
+++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
@@ -231,7 +231,7 @@ public final class ApproximateDistinct {
/**
* Implementation of {@link #globally()}.
*
- * @param <InputT>
+ * @param <InputT> the type of the elements in the input {@link PCollection}
*/
@AutoValue
public abstract static class GloballyDistinct<InputT>
@@ -282,8 +282,8 @@ public final class ApproximateDistinct {
/**
* Implementation of {@link #perKey()}.
*
- * @param <K>
- * @param <V>
+ * @param <K> type of the keys mapping the elements
+ * @param <V> type of the values being combined per key
*/
@AutoValue
public abstract static class PerKeyDistinct<K, V>
@@ -360,7 +360,8 @@ public final class ApproximateDistinct {
try {
coder.verifyDeterministic();
} catch (Coder.NonDeterministicException e) {
- throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(), e);
+ throw new IllegalArgumentException("Coder must be deterministic to perform this sketch."
+ + e.getMessage(), e);
}
return new ApproximateDistinctFn<>(12, 0, coder);
}
diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java
index 6508333..bb62053 100644
--- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java
+++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java
@@ -82,8 +82,8 @@ import org.apache.beam.sdk.values.PCollection;
*
* <p>With the default values, this gives a depth of 200 and a width of 10.
*
- * <p><b>WARNING:</b> The relative error concerns the total number of distinct number elements
- * in the stream. Thus, an element having 1000 occurrences in a streams of 1 million distinct
+ * <p><b>WARNING:</b> The relative error concerns the total number of distinct elements
+ * in a stream. Thus, an element having 1000 occurrences in a stream of 1 million distinct
* elements will have 1% of 1 million as relative error, i.e. 10 000. This means the frequency
* is 1000 +/- 10 000 for this element. Therefore this is obvious that the relative error must
* be really low in very large streams. <br>
@@ -154,14 +154,14 @@ import org.apache.beam.sdk.values.PCollection;
* {@literal new DoFn<Long, KV<MyObject, Long>>()} {
* {@literal @ProcessElement}
* public void procesElement(ProcessContext c) {
- * Long object = c.element();
+ * Long elem = c.element();
* CountMinSketch sketch = c.sideInput(sketchView);
* sketch.estimateCount(elem, coder);
* }}).withSideInputs(sketchView));
* }
* </code></pre>
*
- * <h3>Example 4 : Using the CombineFn</h3>
+ * <h3>Example 4: Using the CombineFn</h3>
*
* <p>The {@code CombineFn} does the same thing as the {@code PTransform}s but
* it can be used for doing stateful processing or in
@@ -215,7 +215,7 @@ public final class SketchFrequencies {
/**
* Implementation of {@link #globally()}.
*
- * @param <InputT>
+ * @param <InputT> the type of the elements in the input {@link PCollection}
*/
@AutoValue
public abstract static class GlobalSketch<InputT>
@@ -255,15 +255,15 @@ public final class SketchFrequencies {
return input.apply("Compute Count-Min Sketch",
Combine.<InputT, Sketch<InputT>>globally(CountMinSketchFn
.<InputT>create(input.getCoder())
- .withAccuracy(this.relativeError(), this.confidence())));
+ .withAccuracy(relativeError(), confidence())));
}
}
/**
* Implementation of {@link #perKey()}.
*
- * @param <K>
- * @param <V>
+ * @param <K> type of the keys mapping the elements
+ * @param <V> type of the values being combined per key
*/
@AutoValue
public abstract static class PerKeySketch<K, V>
@@ -277,7 +277,7 @@ public final class SketchFrequencies {
static <K, V> Builder<K, V> builder() {
return new AutoValue_SketchFrequencies_PerKeySketch.Builder<K, V>()
- .setRelativeError(0.001)
+ .setRelativeError(0.01)
.setConfidence(0.999);
}
@@ -304,7 +304,7 @@ public final class SketchFrequencies {
return input.apply("Compute Count-Min Sketch perKey",
Combine.<K, V, Sketch<V>>perKey(CountMinSketchFn
.<V>create(inputCoder.getValueCoder())
- .withAccuracy(this.relativeError(), this.confidence())));
+ .withAccuracy(relativeError(), confidence())));
}
}
@@ -331,15 +331,17 @@ public final class SketchFrequencies {
}
/**
- * Returns an {@link CountMinSketchFn} combiner with the given input coder.
+ * Returns an {@link CountMinSketchFn} combiner with the given input coder. <br>
+ * <b>Warning :</b> the coder must be deterministic.
*
* @param coder the coder that encodes the elements' type
*/
- public static <InputT> CountMinSketchFn<InputT>create(Coder<InputT> coder) {
+ public static <InputT> CountMinSketchFn<InputT> create(Coder<InputT> coder) {
try {
coder.verifyDeterministic();
} catch (Coder.NonDeterministicException e) {
- throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(), e);
+ throw new IllegalArgumentException("Coder must be deterministic to perform this sketch."
+ + e.getMessage(), e);
}
return new CountMinSketchFn<>(coder, 0.01, 0.999);
}
@@ -360,13 +362,13 @@ public final class SketchFrequencies {
}
if (confidence <= 0D || confidence >= 1D) {
- throw new IllegalArgumentException("The confidence must be comprised between 0 and 1");
+ throw new IllegalArgumentException("The confidence must be between 0 and 1");
}
- return new CountMinSketchFn<InputT>(this.inputCoder, epsilon, confidence);
+ return new CountMinSketchFn<InputT>(inputCoder, epsilon, confidence);
}
@Override public Sketch<InputT> createAccumulator() {
- return new Sketch<InputT>(this.epsilon, this.confidence);
+ return Sketch.<InputT>create(epsilon, confidence);
}
@Override public Sketch<InputT> addInput(Sketch<InputT> accumulator, InputT element) {
@@ -378,18 +380,17 @@ public final class SketchFrequencies {
@Override public Sketch<InputT> mergeAccumulators(Iterable<Sketch<InputT>> accumulators) {
Iterator<Sketch<InputT>> it = accumulators.iterator();
Sketch<InputT> first = it.next();
- CountMinSketch mergedSketches = first.sketch;
+ CountMinSketch mergedSketches = first.sketch();
try {
while (it.hasNext()) {
- mergedSketches = CountMinSketch.merge(mergedSketches, it.next().sketch);
+ mergedSketches = CountMinSketch.merge(mergedSketches, it.next().sketch());
}
} catch (FrequencyMergeException e) {
// Should never happen because every instantiated accumulator are of the same type.
- throw new IllegalStateException("The accumulators cannot be merged !" + e.getMessage());
+ throw new IllegalStateException("The accumulators cannot be merged:" + e.getMessage());
}
- first.sketch = mergedSketches;
- return first;
+ return Sketch.<InputT>create(mergedSketches);
}
/** Output the whole structure so it can be queried, reused or stored easily. */
@@ -419,30 +420,46 @@ public final class SketchFrequencies {
}
/**
- * Wrapper of StreamLib's Count-Min Sketch to fit with Beam requirements.
+ * Wrap StreamLib's Count-Min Sketch to support counting all user types by hashing
+ * the encoded user type using the supplied deterministic coder. This is required
+ * since objects in Apache Beam are considered equal if their encodings are equal.
*/
- public static class Sketch<T> implements Serializable {
+ @AutoValue
+ public abstract static class Sketch<T> implements Serializable {
static final int SEED = 123456;
- int width;
- int depth;
- CountMinSketch sketch;
+ static <T> Sketch<T> create(double eps, double conf) {
+ int width = (int) Math.ceil(2 / eps);
+ int depth = (int) Math.ceil(-Math.log(1 - conf) / Math.log(2));
+ return new AutoValue_SketchFrequencies_Sketch<T>(
+ depth,
+ width,
+ new CountMinSketch(depth, width, SEED));
+ }
- public Sketch(double eps, double confidence) {
- this.width = (int) Math.ceil(2 / eps);
- this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2));
- sketch = new CountMinSketch(depth, width, SEED);
+ static <T> Sketch<T> create(int depth, int width, CountMinSketch sketch) {
+ return new AutoValue_SketchFrequencies_Sketch<T>(
+ depth,
+ width,
+ sketch);
}
- private Sketch(int width, int depth, CountMinSketch sketch) {
- this.sketch = sketch;
- this.width = width;
- this.depth = depth;
+ static <T> Sketch<T> create(CountMinSketch sketch) {
+ int width = (int) Math.ceil(2 / sketch.getRelativeError());
+ int depth = (int) Math.ceil(-Math.log(1 - sketch.getConfidence()) / Math.log(2));
+ return new AutoValue_SketchFrequencies_Sketch<T>(
+ depth,
+ width,
+ sketch);
}
+ abstract int depth();
+ abstract int width();
+ abstract CountMinSketch sketch();
+
public void add(T element, long count, Coder<T> coder) {
- sketch.add(hashElement(element, coder), count);
+ sketch().add(hashElement(element, coder), count);
}
public void add(T element, Coder<T> coder) {
@@ -458,41 +475,14 @@ public final class SketchFrequencies {
}
}
- public int getWidth() {
- return this.width;
- }
-
- public int getDepth() {
- return this.depth;
- }
-
/**
* Utility class to retrieve the estimate frequency of an element from a {@link
* CountMinSketch}.
*/
public long estimateCount(T element, Coder<T> coder) {
- return sketch.estimateCount(hashElement(element, coder));
+ return sketch().estimateCount(hashElement(element, coder));
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final Sketch<T> other = (Sketch<T>) o;
-
- if (depth != other.depth) {
- return false;
- }
- if (width != other.width) {
- return false;
- }
- return sketch.equals(other.sketch);
- }
}
/**
@@ -509,9 +499,9 @@ public final class SketchFrequencies {
if (value == null) {
throw new CoderException("cannot encode a null Count-min Sketch");
}
- INT_CODER.encode(value.width, outStream);
- INT_CODER.encode(value.depth, outStream);
- BYTE_ARRAY_CODER.encode(CountMinSketch.serialize(value.sketch), outStream);
+ INT_CODER.encode(value.width(), outStream);
+ INT_CODER.encode(value.depth(), outStream);
+ BYTE_ARRAY_CODER.encode(CountMinSketch.serialize(value.sketch()), outStream);
}
@Override
@@ -520,7 +510,7 @@ public final class SketchFrequencies {
int depth = INT_CODER.decode(inStream);
byte[] sketchBytes = BYTE_ARRAY_CODER.decode(inStream);
CountMinSketch sketch = CountMinSketch.deserialize(sketchBytes);
- return new Sketch<T>(width, depth, sketch);
+ return Sketch.<T>create(depth, width, sketch);
}
@Override
@@ -537,7 +527,7 @@ public final class SketchFrequencies {
// 4L * 4 is for depth and width (ints) in Sketch<T> and in the Count-Min sketch
// 8L * depth * (width + 1) is a factorization for the sizes of table (long[depth][width])
// and hashA (long[depth])
- return 8L + 4L * 4 + 8L * value.getDepth() * (value.getWidth() + 1);
+ return 8L + 4L * 4 + 8L * value.depth() * (value.width() + 1);
}
}
}
diff --git a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java
index ea773e6..34d9ed1 100644
--- a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java
+++ b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sketching;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
import java.io.Serializable;
import java.util.ArrayList;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
-import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -115,7 +115,7 @@ public class SketchFrequenciesTest implements Serializable {
// n sketches each containing [0, 1, 2]
for (int i = 0; i < nOccurrences; i++) {
- Sketch<Integer> sketch = new Sketch<Integer>(eps, conf);
+ Sketch<Integer> sketch = Sketch.<Integer>create(eps, conf);
for (int j = 0; j < size; j++) {
sketch.add(j, coder);
}
@@ -125,7 +125,7 @@ public class SketchFrequenciesTest implements Serializable {
CountMinSketchFn<Integer> fn = CountMinSketchFn.create(coder).withAccuracy(eps, conf);
Sketch<Integer> merged = fn.mergeAccumulators(sketches);
for (int i = 0; i < size; i++) {
- Assert.assertEquals(nOccurrences, merged.estimateCount(i, coder));
+ assertEquals(nOccurrences, merged.estimateCount(i, coder));
}
}
@@ -135,7 +135,7 @@ public class SketchFrequenciesTest implements Serializable {
long occurrences = 2L; // occurrence of each user in the stream
double eps = 0.01;
double conf = 0.8;
- Sketch<GenericRecord> sketch = new Sketch<>(eps, conf);
+ Sketch<GenericRecord> sketch = Sketch.<GenericRecord>create(eps, conf);
Schema schema =
SchemaBuilder.record("User")
.fields()
@@ -149,13 +149,13 @@ public class SketchFrequenciesTest implements Serializable {
newRecord.put("Pseudo", "User" + i);
newRecord.put("Age", i);
sketch.add(newRecord, occurrences, coder);
- Assert.assertEquals("Test API", occurrences, sketch.estimateCount(newRecord, coder));
+ assertEquals("Test API", occurrences, sketch.estimateCount(newRecord, coder));
}
}
@Test
public void testCoder() throws Exception {
- Sketch<Integer> cMSketch = new Sketch<Integer>(0.01, 0.8);
+ Sketch<Integer> cMSketch = Sketch.<Integer>create(0.01, 0.8);
Coder<Integer> coder = VarIntCoder.of();
for (int i = 0; i < 3; i++) {
cMSketch.add(i, coder);
@@ -196,7 +196,7 @@ public class SketchFrequenciesTest implements Serializable {
@Override
public Void apply(Sketch<T> sketch) {
for (int i = 0; i < elements.length; i++) {
- Assert.assertEquals((long) expectedHits[i], sketch.estimateCount(elements[i], coder));
+ assertEquals((long) expectedHits[i], sketch.estimateCount(elements[i], coder));
}
return null;
}
--
To stop receiving notification emails like this one, please contact
lcwik@apache.org.