You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/21 01:41:48 UTC
[1/3] incubator-beam git commit: Unabbreviate some variable names
Repository: incubator-beam
Updated Branches:
refs/heads/master 4f8cb0344 -> 506023bf5
Unabbreviate some variable names
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4c9dd225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4c9dd225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4c9dd225
Branch: refs/heads/master
Commit: 4c9dd22501ff931e19f41661018795d328b31295
Parents: fdec569
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 20 12:58:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 20 12:58:08 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 92 ++++++++++----------
1 file changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c9dd225/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index b710641..38961ed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -124,8 +124,8 @@ public class CombineTest implements Serializable {
private void runTestSimpleCombine(KV<String, Integer>[] table,
int globalSum,
KV<String, String>[] perKeyCombines) {
- Pipeline p = TestPipeline.create();
- PCollection<KV<String, Integer>> input = createInput(p, table);
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<KV<String, Integer>> input = createInput(pipeline, table);
PCollection<Integer> sum = input
.apply(Values.<Integer>create())
@@ -138,15 +138,15 @@ public class CombineTest implements Serializable {
PAssert.that(sum).containsInAnyOrder(globalSum);
PAssert.that(sumPerKey).containsInAnyOrder(perKeyCombines);
- p.run();
+ pipeline.run();
}
private void runTestSimpleCombineWithContext(KV<String, Integer>[] table,
int globalSum,
KV<String, String>[] perKeyCombines,
String[] globallyCombines) {
- Pipeline p = TestPipeline.create();
- PCollection<KV<String, Integer>> perKeyInput = createInput(p, table);
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<KV<String, Integer>> perKeyInput = createInput(pipeline, table);
PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create());
PCollection<Integer> sum = globallyInput.apply("Sum", Combine.globally(new SumInts()));
@@ -168,7 +168,7 @@ public class CombineTest implements Serializable {
PAssert.that(combinePerKey).containsInAnyOrder(perKeyCombines);
PAssert.that(combineGlobally).containsInAnyOrder(globallyCombines);
- p.run();
+ pipeline.run();
}
@Test
@@ -206,9 +206,9 @@ public class CombineTest implements Serializable {
private void runTestBasicCombine(KV<String, Integer>[] table,
Set<Integer> globalUnique,
KV<String, Set<Integer>>[] perKeyUnique) {
- Pipeline p = TestPipeline.create();
- p.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
- PCollection<KV<String, Integer>> input = createInput(p, table);
+ Pipeline pipeline = TestPipeline.create();
+ pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
+ PCollection<KV<String, Integer>> input = createInput(pipeline, table);
PCollection<Set<Integer>> unique = input
.apply(Values.<Integer>create())
@@ -221,7 +221,7 @@ public class CombineTest implements Serializable {
PAssert.that(unique).containsInAnyOrder(globalUnique);
PAssert.that(uniquePerKey).containsInAnyOrder(perKeyUnique);
- p.run();
+ pipeline.run();
}
@Test
@@ -243,8 +243,8 @@ public class CombineTest implements Serializable {
private void runTestAccumulatingCombine(KV<String, Integer>[] table,
Double globalMean,
KV<String, Double>[] perKeyMeans) {
- Pipeline p = TestPipeline.create();
- PCollection<KV<String, Integer>> input = createInput(p, table);
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<KV<String, Integer>> input = createInput(pipeline, table);
PCollection<Double> mean = input
.apply(Values.<Integer>create())
@@ -257,16 +257,16 @@ public class CombineTest implements Serializable {
PAssert.that(mean).containsInAnyOrder(globalMean);
PAssert.that(meanPerKey).containsInAnyOrder(perKeyMeans);
- p.run();
+ pipeline.run();
}
@Test
@Category(RunnableOnService.class)
public void testFixedWindowsCombine() {
- Pipeline p = TestPipeline.create();
+ Pipeline pipeline = TestPipeline.create();
PCollection<KV<String, Integer>> input =
- p.apply(Create.timestamped(Arrays.asList(TABLE),
+ pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
Arrays.asList(0L, 1L, 6L, 7L, 8L))
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
@@ -284,16 +284,16 @@ public class CombineTest implements Serializable {
KV.of("a", "4a"),
KV.of("b", "1b"),
KV.of("b", "13b"));
- p.run();
+ pipeline.run();
}
@Test
@Category(RunnableOnService.class)
public void testFixedWindowsCombineWithContext() {
- Pipeline p = TestPipeline.create();
+ Pipeline pipeline = TestPipeline.create();
PCollection<KV<String, Integer>> perKeyInput =
- p.apply(Create.timestamped(Arrays.asList(TABLE),
+ pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
Arrays.asList(0L, 1L, 6L, 7L, 8L))
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
@@ -322,16 +322,16 @@ public class CombineTest implements Serializable {
KV.of("b", "15b"),
KV.of("b", "1133b"));
PAssert.that(combineGloballyWithContext).containsInAnyOrder("112G", "145G", "1133G");
- p.run();
+ pipeline.run();
}
@Test
@Category(RunnableOnService.class)
public void testSlidingWindowsCombineWithContext() {
- Pipeline p = TestPipeline.create();
+ Pipeline pipeline = TestPipeline.create();
PCollection<KV<String, Integer>> perKeyInput =
- p.apply(Create.timestamped(Arrays.asList(TABLE),
+ pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
Arrays.asList(2L, 3L, 8L, 9L, 10L))
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.millis(2))));
@@ -365,7 +365,7 @@ public class CombineTest implements Serializable {
KV.of("b", "1133b"));
PAssert.that(combineGloballyWithContext).containsInAnyOrder(
"11G", "112G", "11G", "44G", "145G", "11134G", "1133G");
- p.run();
+ pipeline.run();
}
private static class FormatPaneInfo extends DoFn<Integer, String> {
@@ -378,8 +378,8 @@ public class CombineTest implements Serializable {
@Test
@Category(RunnableOnService.class)
public void testGlobalCombineWithDefaultsAndTriggers() {
- Pipeline p = TestPipeline.create();
- PCollection<Integer> input = p.apply(Create.of(1, 1));
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<Integer> input = pipeline.apply(Create.of(1, 1));
PCollection<String> output = input
.apply(Window.<Integer>into(new GlobalWindows())
@@ -395,10 +395,10 @@ public class CombineTest implements Serializable {
@Test
@Category(RunnableOnService.class)
public void testSessionsCombine() {
- Pipeline p = TestPipeline.create();
+ Pipeline pipeline = TestPipeline.create();
PCollection<KV<String, Integer>> input =
- p.apply(Create.timestamped(Arrays.asList(TABLE),
+ pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
Arrays.asList(0L, 4L, 7L, 10L, 16L))
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5))));
@@ -415,16 +415,16 @@ public class CombineTest implements Serializable {
KV.of("a", "114a"),
KV.of("b", "1b"),
KV.of("b", "13b"));
- p.run();
+ pipeline.run();
}
@Test
@Category(RunnableOnService.class)
public void testSessionsCombineWithContext() {
- Pipeline p = TestPipeline.create();
+ Pipeline pipeline = TestPipeline.create();
PCollection<KV<String, Integer>> perKeyInput =
- p.apply(Create.timestamped(Arrays.asList(TABLE),
+ pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
Arrays.asList(0L, 4L, 7L, 10L, 16L))
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
@@ -458,22 +458,22 @@ public class CombineTest implements Serializable {
KV.of("b", "11b"),
KV.of("b", "013b"));
PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114G", "013G");
- p.run();
+ pipeline.run();
}
@Test
@Category(RunnableOnService.class)
public void testWindowedCombineEmpty() {
- Pipeline p = TestPipeline.create();
+ Pipeline pipeline = TestPipeline.create();
- PCollection<Double> mean = p
+ PCollection<Double> mean = pipeline
.apply(Create.<Integer>of().withCoder(BigEndianIntegerCoder.of()))
.apply(Window.<Integer>into(FixedWindows.of(Duration.millis(1))))
.apply(Combine.globally(new MeanInts()).withoutDefaults());
PAssert.that(mean).empty();
- p.run();
+ pipeline.run();
}
@Test
@@ -541,8 +541,8 @@ public class CombineTest implements Serializable {
@Test
@Category(RunnableOnService.class)
public void testHotKeyCombining() {
- Pipeline p = TestPipeline.create();
- PCollection<KV<String, Integer>> input = copy(createInput(p, TABLE), 10);
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10);
KeyedCombineFn<String, Integer, ?, Double> mean =
new MeanInts().<String>asKeyedFn();
@@ -561,7 +561,7 @@ public class CombineTest implements Serializable {
PAssert.that(hotMean).containsInAnyOrder(expected);
PAssert.that(splitMean).containsInAnyOrder(expected);
- p.run();
+ pipeline.run();
}
private static class GetLast extends DoFn<Integer, Integer> {
@@ -576,8 +576,8 @@ public class CombineTest implements Serializable {
@Test
@Category(RunnableOnService.class)
public void testHotKeyCombiningWithAccumulationMode() {
- Pipeline p = TestPipeline.create();
- PCollection<Integer> input = p.apply(Create.of(1, 2, 3, 4, 5));
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
PCollection<Integer> output = input
.apply(Window.<Integer>into(new GlobalWindows())
@@ -589,13 +589,13 @@ public class CombineTest implements Serializable {
PAssert.that(output).containsInAnyOrder(15);
- p.run();
+ pipeline.run();
}
@Test
public void testBinaryCombineFn() {
- Pipeline p = TestPipeline.create();
- PCollection<KV<String, Integer>> input = copy(createInput(p, TABLE), 2);
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 2);
PCollection<KV<String, Integer>> intProduct = input
.apply("IntProduct", Combine.<String, Integer, Integer>perKey(new TestProdInt()));
PCollection<KV<String, Integer>> objProduct = input
@@ -605,7 +605,7 @@ public class CombineTest implements Serializable {
PAssert.that(intProduct).containsInAnyOrder(expected);
PAssert.that(objProduct).containsInAnyOrder(expected);
- p.run();
+ pipeline.run();
}
@Test
@@ -649,12 +649,12 @@ public class CombineTest implements Serializable {
@Test
@Category(RunnableOnService.class)
public void testCombineGloballyAsSingletonView() {
- Pipeline p = TestPipeline.create();
- final PCollectionView<Integer> view = p
+ Pipeline pipeline = TestPipeline.create();
+ final PCollectionView<Integer> view = pipeline
.apply("CreateEmptySideInput", Create.<Integer>of().withCoder(BigEndianIntegerCoder.of()))
.apply(Sum.integersGlobally().asSingletonView());
- PCollection<Integer> output = p
+ PCollection<Integer> output = pipeline
.apply("CreateVoidMainInput", Create.of((Void) null))
.apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() {
@Override
@@ -664,7 +664,7 @@ public class CombineTest implements Serializable {
}).withSideInputs(view));
PAssert.thatSingleton(output).isEqualTo(0);
- p.run();
+ pipeline.run();
}
@Test
[3/3] incubator-beam git commit: This closes #218
Posted by ke...@apache.org.
This closes #218
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/506023bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/506023bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/506023bf
Branch: refs/heads/master
Commit: 506023bf53a78ba38c87dd3bddcdc069461f22ea
Parents: 4f8cb03 aecf706
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 20 16:41:30 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 20 16:41:30 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 106 +++++++++++--------
1 file changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Fix erroneous test in CombineTest
Posted by ke...@apache.org.
Fix erroneous test in CombineTest
The test modified was never run, so we never learned that it was
incorrect. This change activates the test and fixes it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aecf7069
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aecf7069
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aecf7069
Branch: refs/heads/master
Commit: aecf7069a1a38845a883a226a1547320e5ae51af
Parents: 4c9dd22
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 20 13:12:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 20 16:41:14 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/CombineTest.java | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aecf7069/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 38961ed..9fa148e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes;
+
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -389,7 +391,17 @@ public class CombineTest implements Serializable {
.apply(Sum.integersGlobally())
.apply(ParDo.of(new FormatPaneInfo()));
- PAssert.that(output).containsInAnyOrder("1: false", "2: true");
+ // The actual elements produced are nondeterministic. Could be one, could be two.
+ // But it should certainly have a final element with the correct final sum.
+ PAssert.that(output).satisfies(new SerializableFunction<Iterable<String>, Void>() {
+ @Override
+ public Void apply(Iterable<String> input) {
+ assertThat(input, hasItem("2: true"));
+ return null;
+ }
+ });
+
+ pipeline.run();
}
@Test