You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/03/01 04:10:51 UTC
[4/9] beam git commit: BEAM-1423 Sample should comply with PTransform
style guide
BEAM-1423 Sample should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8e2387d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8e2387d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8e2387d
Branch: refs/heads/master
Commit: a8e2387d37ac2df5c4627ba8f1a9b4ce1ef417b5
Parents: 0b9afe8
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:24:08 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Sample.java | 121 +++++++++++++------
.../beam/sdk/runners/TransformTreeTest.java | 19 +--
.../apache/beam/sdk/transforms/SampleTest.java | 2 +-
3 files changed, 95 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 7d4e630..3734f7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -38,9 +38,17 @@ import org.apache.beam.sdk.values.PCollectionView;
* {@code PTransform}s for taking samples of the elements in a
* {@code PCollection}, or samples of the values associated with each
* key in a {@code PCollection} of {@code KV}s.
+ *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with the
+ * {@link Combine} transform.
*/
public class Sample {
+ /** Returns a {@link CombineFn} that computes a fixed-sized sample of its inputs. */
+ public static <T> CombineFn<T, ?, Iterable<T>> combineFn(int sampleSize) {
+ return new FixedSizedSampleFn<>(sampleSize);
+ }
+
/**
* {@code Sample#any(long)} takes a {@code PCollection<T>} and a limit, and
* produces a new {@code PCollection<T>} containing up to limit
@@ -64,72 +72,65 @@ public class Sample {
* @param limit the number of elements to take from the input
*/
public static <T> PTransform<PCollection<T>, PCollection<T>> any(long limit) {
- return new SampleAny<>(limit);
+ return new Any<>(limit);
}
/**
- * Returns a {@code PTransform} that takes a {@code PCollection<T>},
- * selects {@code sampleSize} elements, uniformly at random, and returns a
- * {@code PCollection<Iterable<T>>} containing the selected elements.
- * If the input {@code PCollection} has fewer than
- * {@code sampleSize} elements, then the output {@code Iterable<T>}
- * will be all the input's elements.
+ * Returns a {@code PTransform} that takes a {@code PCollection<T>}, selects {@code sampleSize}
+ * elements, uniformly at random, and returns a {@code PCollection<Iterable<T>>} containing the
+ * selected elements. If the input {@code PCollection} has fewer than {@code sampleSize} elements,
+ * then the output {@code Iterable<T>} will be all the input's elements.
*
* <p>Example of use:
- * <pre> {@code
+ *
+ * <pre>{@code
* PCollection<String> pc = ...;
* PCollection<Iterable<String>> sampleOfSize10 =
* pc.apply(Sample.fixedSizeGlobally(10));
- * } </pre>
+ * }
+ * </pre>
*
* @param sampleSize the number of elements to select; must be {@code >= 0}
* @param <T> the type of the elements
*/
- public static <T> PTransform<PCollection<T>, PCollection<Iterable<T>>>
- fixedSizeGlobally(int sampleSize) {
- return Combine.globally(new FixedSizedSampleFn<T>(sampleSize));
+ public static <T> PTransform<PCollection<T>, PCollection<Iterable<T>>> fixedSizeGlobally(
+ int sampleSize) {
+ return new FixedSizeGlobally<>(sampleSize);
}
/**
- * Returns a {@code PTransform} that takes an input
- * {@code PCollection<KV<K, V>>} and returns a
- * {@code PCollection<KV<K, Iterable<V>>>} that contains an output
- * element mapping each distinct key in the input
- * {@code PCollection} to a sample of {@code sampleSize} values
- * associated with that key in the input {@code PCollection}, taken
- * uniformly at random. If a key in the input {@code PCollection}
- * has fewer than {@code sampleSize} values associated with it, then
- * the output {@code Iterable<V>} associated with that key will be
- * all the values associated with that key in the input
- * {@code PCollection}.
+ * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, V>>} and returns a
+ * {@code PCollection<KV<K, Iterable<V>>>} that contains an output element mapping each distinct
+ * key in the input {@code PCollection} to a sample of {@code sampleSize} values associated with
+ * that key in the input {@code PCollection}, taken uniformly at random. If a key in the input
+ * {@code PCollection} has fewer than {@code sampleSize} values associated with it, then the
+ * output {@code Iterable<V>} associated with that key will be all the values associated with that
+ * key in the input {@code PCollection}.
*
* <p>Example of use:
- * <pre> {@code
+ *
+ * <pre>{@code
* PCollection<KV<String, Integer>> pc = ...;
* PCollection<KV<String, Iterable<Integer>>> sampleOfSize10PerKey =
* pc.apply(Sample.<String, Integer>fixedSizePerKey());
- * } </pre>
+ * }
+ * </pre>
*
- * @param sampleSize the number of values to select for each
- * distinct key; must be {@code >= 0}
+ * @param sampleSize the number of values to select for each distinct key; must be {@code >= 0}
* @param <K> the type of the keys
* @param <V> the type of the values
*/
- public static <K, V> PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, Iterable<V>>>>
- fixedSizePerKey(int sampleSize) {
- return Combine.perKey(new FixedSizedSampleFn<V>(sampleSize));
+ public static <K, V>
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> fixedSizePerKey(
+ int sampleSize) {
+ return new FixedSizePerKey<>(sampleSize);
}
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@link PTransform} that takes a {@code PCollection<T>} and a limit, and
- * produces a new {@code PCollection<T>} containing up to limit
- * elements of the input {@code PCollection}.
- */
- public static class SampleAny<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ /** Implementation of {@link #any(long)}. */
+ private static class Any<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final long limit;
/**
@@ -137,7 +138,7 @@ public class Sample {
* produces a new PCollection containing up to {@code limit}
* elements of its input {@code PCollection}.
*/
- private SampleAny(long limit) {
+ private Any(long limit) {
checkArgument(limit >= 0, "Expected non-negative limit, received %s.", limit);
this.limit = limit;
}
@@ -162,6 +163,50 @@ public class Sample {
}
}
+ /** Implementation of {@link #fixedSizeGlobally(int)}. */
+ private static class FixedSizeGlobally<T>
+ extends PTransform<PCollection<T>, PCollection<Iterable<T>>> {
+ private final int sampleSize;
+
+ private FixedSizeGlobally(int sampleSize) {
+ this.sampleSize = sampleSize;
+ }
+
+ @Override
+ public PCollection<Iterable<T>> expand(PCollection<T> input) {
+ return input.apply(Combine.globally(new FixedSizedSampleFn<T>(sampleSize)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("sampleSize", sampleSize)
+ .withLabel("Sample Size"));
+ }
+ }
+
+ /** Implementation of {@link #fixedSizeGlobally(int)}. */
+ private static class FixedSizePerKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+ private final int sampleSize;
+
+ private FixedSizePerKey(int sampleSize) {
+ this.sampleSize = sampleSize;
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ return input.apply(Combine.<K, V, Iterable<V>>perKey(new FixedSizedSampleFn<V>(sampleSize)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("sampleSize", sampleSize)
+ .withLabel("Sample Size"));
+ }
+ }
+
/**
* A {@link DoFn} that returns up to limit elements from the side input PCollection.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 6a6e0fc..1f9beb3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -34,8 +34,10 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -62,7 +64,7 @@ public class TransformTreeTest {
enum TransformsSeen {
READ,
WRITE,
- SAMPLE_ANY
+ COMBINE_GLOBALLY
}
/**
@@ -120,7 +122,8 @@ public class TransformTreeTest {
File outputFile = tmpFolder.newFile();
p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
- .apply(Sample.<String>any(10))
+ .apply(Combine.globally(Sample.<String>combineFn(10)))
+ .apply(Flatten.<String>iterables())
.apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
final EnumSet<TransformsSeen> visited =
@@ -132,8 +135,8 @@ public class TransformTreeTest {
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
- if (transform instanceof Sample.SampleAny) {
- assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
+ if (transform instanceof Combine.Globally) {
+ assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
assertNotNull(node.getEnclosingNode());
assertTrue(node.isCompositeNode());
} else if (transform instanceof Write.Bound) {
@@ -148,8 +151,8 @@ public class TransformTreeTest {
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
- if (transform instanceof Sample.SampleAny) {
- assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
+ if (transform instanceof Combine.Globally) {
+ assertTrue(left.add(TransformsSeen.COMBINE_GLOBALLY));
}
}
@@ -157,7 +160,7 @@ public class TransformTreeTest {
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
// Pick is a composite, should not be visited here.
- assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
+ assertThat(transform, not(instanceOf(Combine.Globally.class)));
assertThat(transform, not(instanceOf(Write.Bound.class)));
if (transform instanceof Read.Bounded
&& node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
@@ -167,7 +170,7 @@ public class TransformTreeTest {
});
assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
- assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));
+ assertTrue(left.equals(EnumSet.of(TransformsSeen.COMBINE_GLOBALLY)));
}
@Test(expected = IllegalArgumentException.class)
http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 88b0145..8e426c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -318,7 +318,7 @@ public class SampleTest {
@Test
public void testSampleGetName() {
- assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
+ assertEquals("Sample.Any", Sample.<String>any(1).getName());
}
@Test