You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/01/24 23:13:51 UTC

[beam] 03/04: Make Sketch AutoValue + Javadoc update

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cc9182a6b257726b13b69073aa321da1ad4d6c37
Author: ArnaudFnr <ar...@gmail.com>
AuthorDate: Fri Jan 12 01:41:34 2018 +0100

    Make Sketch AutoValue + Javadoc update
---
 .../extensions/sketching/ApproximateDistinct.java  |   9 +-
 .../extensions/sketching/SketchFrequencies.java    | 126 ++++++++++-----------
 .../sketching/SketchFrequenciesTest.java           |  14 +--
 3 files changed, 70 insertions(+), 79 deletions(-)

diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
index 80446a0..3fea951 100644
--- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
+++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
@@ -231,7 +231,7 @@ public final class ApproximateDistinct {
   /**
    * Implementation of {@link #globally()}.
    *
-   * @param <InputT>
+   * @param <InputT> the type of the elements in the input {@link PCollection}
    */
   @AutoValue
   public abstract static class GloballyDistinct<InputT>
@@ -282,8 +282,8 @@ public final class ApproximateDistinct {
   /**
    * Implementation of {@link #perKey()}.
    *
-   * @param <K>
-   * @param <V>
+   * @param <K> type of the keys mapping the elements
+   * @param <V> type of the values being combined per key
    */
   @AutoValue
   public abstract static class PerKeyDistinct<K, V>
@@ -360,7 +360,8 @@ public final class ApproximateDistinct {
       try {
         coder.verifyDeterministic();
       } catch (Coder.NonDeterministicException e) {
-        throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(), e);
+        throw new IllegalArgumentException("Coder must be deterministic to perform this sketch."
+                + e.getMessage(), e);
       }
       return new ApproximateDistinctFn<>(12, 0, coder);
     }
diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java
index 6508333..bb62053 100644
--- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java
+++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java
@@ -82,8 +82,8 @@ import org.apache.beam.sdk.values.PCollection;
  *
  * <p>With the default values, this gives a depth of 200 and a width of 10.
  *
- * <p><b>WARNING:</b> The relative error concerns the total number of distinct number elements
- * in the stream. Thus, an element having 1000 occurrences in a streams of 1 million distinct
+ * <p><b>WARNING:</b> The relative error concerns the total number of distinct elements
+ * in a stream. Thus, an element having 1000 occurrences in a stream of 1 million distinct
  * elements will have 1% of 1 million as relative error, i.e. 10 000. This means the frequency
  * is 1000 +/- 10 000 for this element. Therefore this is obvious that the relative error must
  * be really low in very large streams. <br>
@@ -154,14 +154,14 @@ import org.apache.beam.sdk.values.PCollection;
  *        {@literal new DoFn<Long, KV<MyObject, Long>>()} {
  *          {@literal @ProcessElement}
  *           public void procesElement(ProcessContext c) {
- *             Long object = c.element();
+ *             Long elem = c.element();
  *             CountMinSketch sketch = c.sideInput(sketchView);
  *             sketch.estimateCount(elem, coder);
  *            }}).withSideInputs(sketchView));
  * }
  * </code></pre>
  *
- * <h3>Example 4 : Using the CombineFn</h3>
+ * <h3>Example 4: Using the CombineFn</h3>
  *
  * <p>The {@code CombineFn} does the same thing as the {@code PTransform}s but
  * it can be used for doing stateful processing or in
@@ -215,7 +215,7 @@ public final class SketchFrequencies {
   /**
    * Implementation of {@link #globally()}.
    *
-   * @param <InputT>
+   * @param <InputT> the type of the elements in the input {@link PCollection}
    */
   @AutoValue
   public abstract static class GlobalSketch<InputT>
@@ -255,15 +255,15 @@ public final class SketchFrequencies {
       return input.apply("Compute Count-Min Sketch",
                       Combine.<InputT, Sketch<InputT>>globally(CountMinSketchFn
                               .<InputT>create(input.getCoder())
-                              .withAccuracy(this.relativeError(), this.confidence())));
+                              .withAccuracy(relativeError(), confidence())));
     }
   }
 
   /**
    * Implementation of {@link #perKey()}.
    *
-   * @param <K>
-   * @param <V>
+   * @param <K> type of the keys mapping the elements
+   * @param <V> type of the values being combined per key
    */
   @AutoValue
   public abstract static class PerKeySketch<K, V>
@@ -277,7 +277,7 @@ public final class SketchFrequencies {
 
     static <K, V> Builder<K, V> builder() {
       return new AutoValue_SketchFrequencies_PerKeySketch.Builder<K, V>()
-              .setRelativeError(0.001)
+              .setRelativeError(0.01)
               .setConfidence(0.999);
     }
 
@@ -304,7 +304,7 @@ public final class SketchFrequencies {
       return input.apply("Compute Count-Min Sketch perKey",
               Combine.<K, V, Sketch<V>>perKey(CountMinSketchFn
                       .<V>create(inputCoder.getValueCoder())
-                      .withAccuracy(this.relativeError(), this.confidence())));
+                      .withAccuracy(relativeError(), confidence())));
     }
   }
 
@@ -331,15 +331,17 @@ public final class SketchFrequencies {
     }
 
     /**
-     * Returns an {@link CountMinSketchFn} combiner with the given input coder.
+     * Returns an {@link CountMinSketchFn} combiner with the given input coder. <br>
+     * <b>Warning :</b> the coder must be deterministic.
      *
      * @param coder the coder that encodes the elements' type
      */
-    public static <InputT> CountMinSketchFn<InputT>create(Coder<InputT> coder) {
+    public static <InputT> CountMinSketchFn<InputT> create(Coder<InputT> coder) {
       try {
         coder.verifyDeterministic();
       } catch (Coder.NonDeterministicException e) {
-        throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(), e);
+        throw new IllegalArgumentException("Coder must be deterministic to perform this sketch."
+                + e.getMessage(), e);
       }
       return new CountMinSketchFn<>(coder, 0.01, 0.999);
     }
@@ -360,13 +362,13 @@ public final class SketchFrequencies {
       }
 
       if (confidence <= 0D || confidence >= 1D) {
-        throw new IllegalArgumentException("The confidence must be comprised between 0 and 1");
+        throw new IllegalArgumentException("The confidence must be between 0 and 1");
       }
-      return new CountMinSketchFn<InputT>(this.inputCoder, epsilon, confidence);
+      return new CountMinSketchFn<InputT>(inputCoder, epsilon, confidence);
     }
 
     @Override public Sketch<InputT> createAccumulator() {
-      return new Sketch<InputT>(this.epsilon, this.confidence);
+      return Sketch.<InputT>create(epsilon, confidence);
     }
 
     @Override public Sketch<InputT> addInput(Sketch<InputT> accumulator, InputT element) {
@@ -378,18 +380,17 @@ public final class SketchFrequencies {
     @Override public Sketch<InputT> mergeAccumulators(Iterable<Sketch<InputT>> accumulators) {
       Iterator<Sketch<InputT>> it = accumulators.iterator();
       Sketch<InputT> first = it.next();
-      CountMinSketch mergedSketches = first.sketch;
+      CountMinSketch mergedSketches = first.sketch();
       try {
         while (it.hasNext()) {
-          mergedSketches = CountMinSketch.merge(mergedSketches, it.next().sketch);
+          mergedSketches = CountMinSketch.merge(mergedSketches, it.next().sketch());
         }
       } catch (FrequencyMergeException e) {
         // Should never happen because every instantiated accumulator are of the same type.
-        throw new IllegalStateException("The accumulators cannot be merged !" + e.getMessage());
+        throw new IllegalStateException("The accumulators cannot be merged:" + e.getMessage());
       }
-      first.sketch = mergedSketches;
 
-      return first;
+      return Sketch.<InputT>create(mergedSketches);
     }
 
     /** Output the whole structure so it can be queried, reused or stored easily. */
@@ -419,30 +420,46 @@ public final class SketchFrequencies {
   }
 
   /**
-   * Wrapper of StreamLib's Count-Min Sketch to fit with Beam requirements.
+   * Wrap StreamLib's Count-Min Sketch to support counting all user types by hashing
+   * the encoded user type using the supplied deterministic coder. This is required
+   * since objects in Apache Beam are considered equal if their encodings are equal.
    */
-  public static class Sketch<T> implements Serializable {
+  @AutoValue
+  public abstract static class Sketch<T> implements Serializable {
 
     static final int SEED = 123456;
 
-    int width;
-    int depth;
-    CountMinSketch sketch;
+    static <T> Sketch<T> create(double eps, double conf) {
+      int width = (int) Math.ceil(2 / eps);
+      int depth = (int) Math.ceil(-Math.log(1 - conf) / Math.log(2));
+      return new AutoValue_SketchFrequencies_Sketch<T>(
+              depth,
+              width,
+              new CountMinSketch(depth, width, SEED));
+    }
 
-    public Sketch(double eps, double confidence) {
-      this.width = (int) Math.ceil(2 / eps);
-      this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2));
-      sketch = new CountMinSketch(depth, width, SEED);
+    static <T> Sketch<T> create(int depth, int width, CountMinSketch sketch) {
+      return new AutoValue_SketchFrequencies_Sketch<T>(
+              depth,
+              width,
+              sketch);
     }
 
-    private Sketch(int width, int depth, CountMinSketch sketch) {
-      this.sketch = sketch;
-      this.width = width;
-      this.depth = depth;
+    static <T> Sketch<T> create(CountMinSketch sketch) {
+      int width = (int) Math.ceil(2 / sketch.getRelativeError());
+      int depth = (int) Math.ceil(-Math.log(1 - sketch.getConfidence()) / Math.log(2));
+      return new AutoValue_SketchFrequencies_Sketch<T>(
+              depth,
+              width,
+              sketch);
     }
 
+    abstract int depth();
+    abstract int width();
+    abstract CountMinSketch sketch();
+
     public void add(T element, long count, Coder<T> coder) {
-      sketch.add(hashElement(element, coder), count);
+      sketch().add(hashElement(element, coder), count);
     }
 
     public void add(T element, Coder<T> coder) {
@@ -458,41 +475,14 @@ public final class SketchFrequencies {
       }
     }
 
-    public int getWidth() {
-      return this.width;
-    }
-
-    public int getDepth() {
-      return this.depth;
-    }
-
     /**
      * Utility class to retrieve the estimate frequency of an element from a {@link
      * CountMinSketch}.
      */
     public long estimateCount(T element, Coder<T> coder) {
-      return sketch.estimateCount(hashElement(element, coder));
+      return sketch().estimateCount(hashElement(element, coder));
     }
 
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      final Sketch<T> other = (Sketch<T>) o;
-
-      if (depth != other.depth) {
-        return false;
-      }
-      if (width != other.width) {
-        return false;
-      }
-      return sketch.equals(other.sketch);
-    }
   }
 
   /**
@@ -509,9 +499,9 @@ public final class SketchFrequencies {
       if (value == null) {
         throw new CoderException("cannot encode a null Count-min Sketch");
       }
-      INT_CODER.encode(value.width, outStream);
-      INT_CODER.encode(value.depth, outStream);
-      BYTE_ARRAY_CODER.encode(CountMinSketch.serialize(value.sketch), outStream);
+      INT_CODER.encode(value.width(), outStream);
+      INT_CODER.encode(value.depth(), outStream);
+      BYTE_ARRAY_CODER.encode(CountMinSketch.serialize(value.sketch()), outStream);
     }
 
     @Override
@@ -520,7 +510,7 @@ public final class SketchFrequencies {
       int depth = INT_CODER.decode(inStream);
       byte[] sketchBytes = BYTE_ARRAY_CODER.decode(inStream);
       CountMinSketch sketch = CountMinSketch.deserialize(sketchBytes);
-      return new Sketch<T>(width, depth, sketch);
+      return Sketch.<T>create(depth, width, sketch);
     }
 
     @Override
@@ -537,7 +527,7 @@ public final class SketchFrequencies {
         // 4L * 4 is for depth and width (ints) in Sketch<T> and in the Count-Min sketch
         // 8L * depth * (width + 1) is a factorization for the sizes of table (long[depth][width])
         // and hashA (long[depth])
-        return 8L + 4L * 4 + 8L * value.getDepth() * (value.getWidth() + 1);
+        return 8L + 4L * 4 + 8L * value.depth() * (value.width() + 1);
       }
     }
   }
diff --git a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java
index ea773e6..34d9ed1 100644
--- a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java
+++ b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sketching;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -115,7 +115,7 @@ public class SketchFrequenciesTest implements Serializable {
 
     // n sketches each containing [0, 1, 2]
     for (int i = 0; i < nOccurrences; i++) {
-      Sketch<Integer> sketch = new Sketch<Integer>(eps, conf);
+      Sketch<Integer> sketch = Sketch.<Integer>create(eps, conf);
       for (int j = 0; j < size; j++) {
         sketch.add(j, coder);
       }
@@ -125,7 +125,7 @@ public class SketchFrequenciesTest implements Serializable {
     CountMinSketchFn<Integer> fn = CountMinSketchFn.create(coder).withAccuracy(eps, conf);
     Sketch<Integer> merged = fn.mergeAccumulators(sketches);
     for (int i = 0; i < size; i++) {
-      Assert.assertEquals(nOccurrences, merged.estimateCount(i, coder));
+      assertEquals(nOccurrences, merged.estimateCount(i, coder));
     }
   }
 
@@ -135,7 +135,7 @@ public class SketchFrequenciesTest implements Serializable {
     long occurrences = 2L; // occurrence of each user in the stream
     double eps = 0.01;
     double conf = 0.8;
-    Sketch<GenericRecord> sketch = new Sketch<>(eps, conf);
+    Sketch<GenericRecord> sketch = Sketch.<GenericRecord>create(eps, conf);
     Schema schema =
             SchemaBuilder.record("User")
                     .fields()
@@ -149,13 +149,13 @@ public class SketchFrequenciesTest implements Serializable {
       newRecord.put("Pseudo", "User" + i);
       newRecord.put("Age", i);
       sketch.add(newRecord, occurrences, coder);
-      Assert.assertEquals("Test API", occurrences, sketch.estimateCount(newRecord, coder));
+      assertEquals("Test API", occurrences, sketch.estimateCount(newRecord, coder));
     }
   }
 
   @Test
   public void testCoder() throws Exception {
-    Sketch<Integer> cMSketch = new Sketch<Integer>(0.01, 0.8);
+    Sketch<Integer> cMSketch = Sketch.<Integer>create(0.01, 0.8);
     Coder<Integer> coder = VarIntCoder.of();
     for (int i = 0; i < 3; i++) {
       cMSketch.add(i, coder);
@@ -196,7 +196,7 @@ public class SketchFrequenciesTest implements Serializable {
     @Override
     public Void apply(Sketch<T> sketch) {
       for (int i = 0; i < elements.length; i++) {
-        Assert.assertEquals((long) expectedHits[i], sketch.estimateCount(elements[i], coder));
+        assertEquals((long) expectedHits[i], sketch.estimateCount(elements[i], coder));
       }
       return null;
     }

-- 
To stop receiving notification emails like this one, please contact
lcwik@apache.org.