You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 00:20:06 UTC
[1/2] beam git commit: Add a Combine Test for Sliding Windows without
Context
Repository: beam
Updated Branches:
refs/heads/master 52cea71ed -> 7fc73d790
Add a Combine Test for Sliding Windows without Context
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ade8426
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ade8426
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ade8426
Branch: refs/heads/master
Commit: 6ade8426edc2ace1a9bec8f9501d8dad17e91365
Parents: 1e16aa2
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 28 12:51:31 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 28 12:52:37 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 63 ++++++++++++++++++++
1 file changed, 63 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6ade8426/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index e2469ab..b24d82d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -29,11 +29,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -325,6 +327,67 @@ public class CombineTest implements Serializable {
@Test
@Category(ValidatesRunner.class)
+ public void testSlidingWindowsCombine() {
+ PCollection<String> input =
+ pipeline
+ .apply(
+ Create.timestamped(
+ TimestampedValue.of("a", new Instant(1L)),
+ TimestampedValue.of("b", new Instant(2L)),
+ TimestampedValue.of("c", new Instant(3L))))
+ .apply(
+ Window.<String>into(
+ SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1L))));
+ PCollection<List<String>> combined =
+ input.apply(
+ Combine.globally(
+ new CombineFn<String, List<String>, List<String>>() {
+ @Override
+ public List<String> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<String> addInput(List<String> accumulator, String input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+ // Mutate all of the accumulators. Instances should be used in only one
+ // place, and not
+ // reused after merging.
+ List<String> cur = createAccumulator();
+ for (List<String> accumulator : accumulators) {
+ accumulator.addAll(cur);
+ cur = accumulator;
+ }
+ return cur;
+ }
+
+ @Override
+ public List<String> extractOutput(List<String> accumulator) {
+ List<String> result = new ArrayList<>(accumulator);
+ Collections.sort(result);
+ return result;
+ }
+ })
+ .withoutDefaults());
+
+ PAssert.that(combined)
+ .containsInAnyOrder(
+ ImmutableList.of("a"),
+ ImmutableList.of("a", "b"),
+ ImmutableList.of("a", "b", "c"),
+ ImmutableList.of("b", "c"),
+ ImmutableList.of("c"));
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category(ValidatesRunner.class)
public void testSlidingWindowsCombineWithContext() {
// [a: 1, 1], [a: 4; b: 1], [b: 13]
PCollection<KV<String, Integer>> perKeyInput =
[2/2] beam git commit: This closes #3462: Add a Combine Test for
Sliding Windows without Context
Posted by ke...@apache.org.
This closes #3462: Add a Combine Test for Sliding Windows without Context
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fc73d79
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fc73d79
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fc73d79
Branch: refs/heads/master
Commit: 7fc73d790803760c1a0684106be0c38126266598
Parents: 52cea71 6ade842
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 29 17:19:43 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 29 17:19:43 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 63 ++++++++++++++++++++
1 file changed, 63 insertions(+)
----------------------------------------------------------------------