You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:23 UTC
[21/50] beam git commit: Add a Combine Test for Sliding Windows
without Context
Add a Combine Test for Sliding Windows without Context
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ade8426
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ade8426
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ade8426
Branch: refs/heads/gearpump-runner
Commit: 6ade8426edc2ace1a9bec8f9501d8dad17e91365
Parents: 1e16aa2
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 28 12:51:31 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 28 12:52:37 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 63 ++++++++++++++++++++
1 file changed, 63 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6ade8426/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index e2469ab..b24d82d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -29,11 +29,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -325,6 +327,67 @@ public class CombineTest implements Serializable {
@Test
@Category(ValidatesRunner.class)
+ public void testSlidingWindowsCombine() {
+ PCollection<String> input =
+ pipeline
+ .apply(
+ Create.timestamped(
+ TimestampedValue.of("a", new Instant(1L)),
+ TimestampedValue.of("b", new Instant(2L)),
+ TimestampedValue.of("c", new Instant(3L))))
+ .apply(
+ Window.<String>into(
+ SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1L))));
+ PCollection<List<String>> combined =
+ input.apply(
+ Combine.globally(
+ new CombineFn<String, List<String>, List<String>>() {
+ @Override
+ public List<String> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<String> addInput(List<String> accumulator, String input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+ // Mutate all of the accumulators. Instances should be used in only one
+ // place, and not
+ // reused after merging.
+ List<String> cur = createAccumulator();
+ for (List<String> accumulator : accumulators) {
+ accumulator.addAll(cur);
+ cur = accumulator;
+ }
+ return cur;
+ }
+
+ @Override
+ public List<String> extractOutput(List<String> accumulator) {
+ List<String> result = new ArrayList<>(accumulator);
+ Collections.sort(result);
+ return result;
+ }
+ })
+ .withoutDefaults());
+
+ PAssert.that(combined)
+ .containsInAnyOrder(
+ ImmutableList.of("a"),
+ ImmutableList.of("a", "b"),
+ ImmutableList.of("a", "b", "c"),
+ ImmutableList.of("b", "c"),
+ ImmutableList.of("c"));
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category(ValidatesRunner.class)
public void testSlidingWindowsCombineWithContext() {
// [a: 1, 1], [a: 4; b: 1], [b: 13]
PCollection<KV<String, Integer>> perKeyInput =