You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/03/01 04:10:51 UTC

[4/9] beam git commit: BEAM-1423 Sample should comply with PTransform style guide

BEAM-1423 Sample should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8e2387d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8e2387d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8e2387d

Branch: refs/heads/master
Commit: a8e2387d37ac2df5c4627ba8f1a9b4ce1ef417b5
Parents: 0b9afe8
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:24:08 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Sample.java  | 121 +++++++++++++------
 .../beam/sdk/runners/TransformTreeTest.java     |  19 +--
 .../apache/beam/sdk/transforms/SampleTest.java  |   2 +-
 3 files changed, 95 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 7d4e630..3734f7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -38,9 +38,17 @@ import org.apache.beam.sdk.values.PCollectionView;
  * {@code PTransform}s for taking samples of the elements in a
  * {@code PCollection}, or samples of the values associated with each
  * key in a {@code PCollection} of {@code KV}s.
+ *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with the
+ * {@link Combine} transform.
  */
 public class Sample {
 
+  /** Returns a {@link CombineFn} that computes a fixed-sized sample of its inputs. */
+  public static <T> CombineFn<T, ?, Iterable<T>> combineFn(int sampleSize) {
+    return new FixedSizedSampleFn<>(sampleSize);
+  }
+
   /**
    * {@code Sample#any(long)} takes a {@code PCollection<T>} and a limit, and
    * produces a new {@code PCollection<T>} containing up to limit
@@ -64,72 +72,65 @@ public class Sample {
    * @param limit the number of elements to take from the input
    */
   public static <T> PTransform<PCollection<T>, PCollection<T>> any(long limit) {
-    return new SampleAny<>(limit);
+    return new Any<>(limit);
   }
 
   /**
-   * Returns a {@code PTransform} that takes a {@code PCollection<T>},
-   * selects {@code sampleSize} elements, uniformly at random, and returns a
-   * {@code PCollection<Iterable<T>>} containing the selected elements.
-   * If the input {@code PCollection} has fewer than
-   * {@code sampleSize} elements, then the output {@code Iterable<T>}
-   * will be all the input's elements.
+   * Returns a {@code PTransform} that takes a {@code PCollection<T>}, selects {@code sampleSize}
+   * elements, uniformly at random, and returns a {@code PCollection<Iterable<T>>} containing the
+   * selected elements. If the input {@code PCollection} has fewer than {@code sampleSize} elements,
+   * then the output {@code Iterable<T>} will be all the input's elements.
    *
    * <p>Example of use:
-   * <pre> {@code
+   *
+   * <pre>{@code
    * PCollection<String> pc = ...;
    * PCollection<Iterable<String>> sampleOfSize10 =
    *     pc.apply(Sample.fixedSizeGlobally(10));
-   * } </pre>
+   * }
+   * </pre>
    *
    * @param sampleSize the number of elements to select; must be {@code >= 0}
    * @param <T> the type of the elements
    */
-  public static <T> PTransform<PCollection<T>, PCollection<Iterable<T>>>
-      fixedSizeGlobally(int sampleSize) {
-    return Combine.globally(new FixedSizedSampleFn<T>(sampleSize));
+  public static <T> PTransform<PCollection<T>, PCollection<Iterable<T>>> fixedSizeGlobally(
+      int sampleSize) {
+    return new FixedSizeGlobally<>(sampleSize);
   }
 
   /**
-   * Returns a {@code PTransform} that takes an input
-   * {@code PCollection<KV<K, V>>} and returns a
-   * {@code PCollection<KV<K, Iterable<V>>>} that contains an output
-   * element mapping each distinct key in the input
-   * {@code PCollection} to a sample of {@code sampleSize} values
-   * associated with that key in the input {@code PCollection}, taken
-   * uniformly at random.  If a key in the input {@code PCollection}
-   * has fewer than {@code sampleSize} values associated with it, then
-   * the output {@code Iterable<V>} associated with that key will be
-   * all the values associated with that key in the input
-   * {@code PCollection}.
+   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, V>>} and returns a
+   * {@code PCollection<KV<K, Iterable<V>>>} that contains an output element mapping each distinct
+   * key in the input {@code PCollection} to a sample of {@code sampleSize} values associated with
+   * that key in the input {@code PCollection}, taken uniformly at random. If a key in the input
+   * {@code PCollection} has fewer than {@code sampleSize} values associated with it, then the
+   * output {@code Iterable<V>} associated with that key will be all the values associated with that
+   * key in the input {@code PCollection}.
    *
    * <p>Example of use:
-   * <pre> {@code
+   *
+   * <pre>{@code
    * PCollection<KV<String, Integer>> pc = ...;
    * PCollection<KV<String, Iterable<Integer>>> sampleOfSize10PerKey =
    *     pc.apply(Sample.<String, Integer>fixedSizePerKey());
-   * } </pre>
+   * }
+   * </pre>
    *
-   * @param sampleSize the number of values to select for each
-   * distinct key; must be {@code >= 0}
+   * @param sampleSize the number of values to select for each distinct key; must be {@code >= 0}
    * @param <K> the type of the keys
    * @param <V> the type of the values
    */
-  public static <K, V> PTransform<PCollection<KV<K, V>>,
-                                  PCollection<KV<K, Iterable<V>>>>
-      fixedSizePerKey(int sampleSize) {
-    return Combine.perKey(new FixedSizedSampleFn<V>(sampleSize));
+  public static <K, V>
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> fixedSizePerKey(
+          int sampleSize) {
+    return new FixedSizePerKey<>(sampleSize);
   }
 
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@link PTransform} that takes a {@code PCollection<T>} and a limit, and
-   * produces a new {@code PCollection<T>} containing up to limit
-   * elements of the input {@code PCollection}.
-   */
-  public static class SampleAny<T> extends PTransform<PCollection<T>, PCollection<T>> {
+  /** Implementation of {@link #any(long)}. */
+  private static class Any<T> extends PTransform<PCollection<T>, PCollection<T>> {
     private final long limit;
 
     /**
@@ -137,7 +138,7 @@ public class Sample {
      * produces a new PCollection containing up to {@code limit}
      * elements of its input {@code PCollection}.
      */
-    private SampleAny(long limit) {
+    private Any(long limit) {
       checkArgument(limit >= 0, "Expected non-negative limit, received %s.", limit);
       this.limit = limit;
     }
@@ -162,6 +163,50 @@ public class Sample {
     }
   }
 
+  /** Implementation of {@link #fixedSizeGlobally(int)}. */
+  private static class FixedSizeGlobally<T>
+      extends PTransform<PCollection<T>, PCollection<Iterable<T>>> {
+    private final int sampleSize;
+
+    private FixedSizeGlobally(int sampleSize) {
+      this.sampleSize = sampleSize;
+    }
+
+    @Override
+    public PCollection<Iterable<T>> expand(PCollection<T> input) {
+      return input.apply(Combine.globally(new FixedSizedSampleFn<T>(sampleSize)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("sampleSize", sampleSize)
+          .withLabel("Sample Size"));
+    }
+  }
+
+  /** Implementation of {@link #fixedSizeGlobally(int)}. */
+  private static class FixedSizePerKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+    private final int sampleSize;
+
+    private FixedSizePerKey(int sampleSize) {
+      this.sampleSize = sampleSize;
+    }
+
+    @Override
+    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      return input.apply(Combine.<K, V, Iterable<V>>perKey(new FixedSizedSampleFn<V>(sampleSize)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("sampleSize", sampleSize)
+          .withLabel("Sample Size"));
+    }
+  }
+
   /**
    * A {@link DoFn} that returns up to limit elements from the side input PCollection.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 6a6e0fc..1f9beb3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -34,8 +34,10 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Sample;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -62,7 +64,7 @@ public class TransformTreeTest {
   enum TransformsSeen {
     READ,
     WRITE,
-    SAMPLE_ANY
+    COMBINE_GLOBALLY
   }
 
   /**
@@ -120,7 +122,8 @@ public class TransformTreeTest {
     File outputFile = tmpFolder.newFile();
 
     p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
-        .apply(Sample.<String>any(10))
+        .apply(Combine.globally(Sample.<String>combineFn(10)))
+        .apply(Flatten.<String>iterables())
         .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
 
     final EnumSet<TransformsSeen> visited =
@@ -132,8 +135,8 @@ public class TransformTreeTest {
       @Override
       public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
         PTransform<?, ?> transform = node.getTransform();
-        if (transform instanceof Sample.SampleAny) {
-          assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
+        if (transform instanceof Combine.Globally) {
+          assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
           assertNotNull(node.getEnclosingNode());
           assertTrue(node.isCompositeNode());
         } else if (transform instanceof Write.Bound) {
@@ -148,8 +151,8 @@ public class TransformTreeTest {
       @Override
       public void leaveCompositeTransform(TransformHierarchy.Node node) {
         PTransform<?, ?> transform = node.getTransform();
-        if (transform instanceof Sample.SampleAny) {
-          assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
+        if (transform instanceof Combine.Globally) {
+          assertTrue(left.add(TransformsSeen.COMBINE_GLOBALLY));
         }
       }
 
@@ -157,7 +160,7 @@ public class TransformTreeTest {
       public void visitPrimitiveTransform(TransformHierarchy.Node node) {
         PTransform<?, ?> transform = node.getTransform();
         // Pick is a composite, should not be visited here.
-        assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
+        assertThat(transform, not(instanceOf(Combine.Globally.class)));
         assertThat(transform, not(instanceOf(Write.Bound.class)));
         if (transform instanceof Read.Bounded
             && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
@@ -167,7 +170,7 @@ public class TransformTreeTest {
     });
 
     assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
-    assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));
+    assertTrue(left.equals(EnumSet.of(TransformsSeen.COMBINE_GLOBALLY)));
   }
 
   @Test(expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 88b0145..8e426c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -318,7 +318,7 @@ public class SampleTest {
 
     @Test
     public void testSampleGetName() {
-      assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
+      assertEquals("Sample.Any", Sample.<String>any(1).getName());
     }
 
     @Test