You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/08/10 16:50:11 UTC
[2/2] beam git commit: Update Custom Window Merging Tests
Update Custom Window Merging Tests
Update the merge windows to be order-independent.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24ee742d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24ee742d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24ee742d
Branch: refs/heads/master
Commit: 24ee742d5c2cc4082a8eecdc01d7b34d8fc8ef54
Parents: cd4f5a2
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 7 15:05:59 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Aug 10 09:49:50 2017 -0700
----------------------------------------------------------------------
.../sdk/transforms/windowing/WindowTest.java | 125 +++++++++----------
1 file changed, 61 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/24ee742d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 5b6d046..e2f8c26 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -38,8 +38,12 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
@@ -581,20 +585,23 @@ public class WindowTest implements Serializable {
assertThat(data, not(hasDisplayItem("trigger")));
assertThat(data, not(hasDisplayItem("allowedLateness")));
}
+
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindows() {
Instant startInstant = new Instant(0L);
List<TimestampedValue<String>> input = new ArrayList<>();
- input.add(TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))));
- input.add(TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))));
- // This one will be outside of bigWindow thus not merged
- input.add(TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39))));
- PCollection<String> inputCollection = pipeline.apply(Create.timestamped(input));
- PCollection<String> windowedCollection = inputCollection
- .apply(Window.into(new CustomWindowFn<String>()));
- PCollection<Long> count = windowedCollection
- .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
+ PCollection<String> inputCollection =
+ pipeline.apply(
+ Create.timestamped(
+ TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))),
+ // This one will be outside of bigWindow thus not merged
+ TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39)))));
+ PCollection<String> windowedCollection =
+ inputCollection.apply(Window.into(new CustomWindowFn<String>()));
+ PCollection<Long> count =
+ windowedCollection.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
// "small1" and "big" elements merged into bigWindow "small2" not merged
// because timestamp is not in bigWindow
PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
@@ -607,61 +614,54 @@ public class WindowTest implements Serializable {
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindowsKeyedCollection() {
Instant startInstant = new Instant(0L);
- List<TimestampedValue<KV<Integer, String>>> input = new ArrayList<>();
- input
- .add(TimestampedValue.of(KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))));
- input.add(
- TimestampedValue.of(KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))));
- // This one will be outside of bigWindow thus not merged
- input.add(
- TimestampedValue.of(KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39))));
- PCollection<KV<Integer, String>> inputCollection = pipeline.apply(Create.timestamped(input));
- PCollection<KV<Integer, String>> windowedCollection = inputCollection
- .apply(Window.into(new CustomWindowFn<KV<Integer, String>>()));
- PCollection<Long> count = windowedCollection
- .apply(Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
+ PCollection<KV<Integer, String>> inputCollection =
+ pipeline.apply(
+ Create.timestamped(
+ TimestampedValue.of(
+ KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of(
+ KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))),
+ // This element is not contained within the bigWindow and not merged
+ TimestampedValue.of(
+ KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39)))));
+ PCollection<KV<Integer, String>> windowedCollection =
+ inputCollection.apply(Window.into(new CustomWindowFn<KV<Integer, String>>()));
+ PCollection<Long> count =
+ windowedCollection.apply(
+ Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
// "small1" and "big" elements merged into bigWindow "small2" not merged
- // because timestamp is not in bigWindow
+ // because it is not contained in bigWindow
PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
pipeline.run();
}
private static class CustomWindow extends IntervalWindow {
-
private boolean isBig;
-
- CustomWindow(Instant start, Instant end) {
- super(start, end);
- this.isBig = false;
- }
-
CustomWindow(Instant start, Instant end, boolean isBig) {
super(start, end);
this.isBig = isBig;
}
- @Override public boolean equals(Object o) {
+ @Override
+ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
- if (!super.equals(o)) {
- return false;
- }
CustomWindow that = (CustomWindow) o;
- return isBig == that.isBig;
+ return super.equals(o) && this.isBig == that.isBig;
}
- @Override public int hashCode() {
+ @Override
+ public int hashCode() {
return Objects.hash(super.hashCode(), isBig);
}
}
- private static class CustomWindowCoder extends
- CustomCoder<CustomWindow> {
+ private static class CustomWindowCoder extends CustomCoder<CustomWindow> {
private static final CustomWindowCoder INSTANCE = new CustomWindowCoder();
private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder();
@@ -672,8 +672,7 @@ public class WindowTest implements Serializable {
}
@Override
- public void encode(CustomWindow window, OutputStream outStream)
- throws IOException {
+ public void encode(CustomWindow window, OutputStream outStream) throws IOException {
INTERVAL_WINDOW_CODER.encode(window, outStream);
VAR_INT_CODER.encode(window.isBig ? 1 : 0, outStream);
}
@@ -693,12 +692,12 @@ public class WindowTest implements Serializable {
}
private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> {
-
- @Override public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception {
+ @Override
+ public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception {
String element;
// It loses genericity of type T but this is not a big deal for a test.
// And it allows to avoid duplicating CustomWindowFn to support PCollection<KV>
- if (c.element() instanceof KV){
+ if (c.element() instanceof KV) {
element = ((KV<Integer, String>) c.element()).getValue();
} else {
element = (String) c.element();
@@ -706,33 +705,34 @@ public class WindowTest implements Serializable {
// put big elements in windows of 30s and small ones in windows of 5s
if ("big".equals(element)) {
return Collections.singletonList(
- new CustomWindow(c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)),
- true));
+ new CustomWindow(
+ c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)), true));
} else {
return Collections.singletonList(
- new CustomWindow(c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)),
- false));
+ new CustomWindow(
+ c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)), false));
}
}
@Override
public void mergeWindows(MergeContext c) throws Exception {
- List<CustomWindow> toBeMerged = new ArrayList<>();
- CustomWindow bigWindow = null;
- for (CustomWindow customWindow : c.windows()) {
- if (customWindow.isBig) {
- bigWindow = customWindow;
- toBeMerged.add(customWindow);
- } else if (bigWindow != null
- && customWindow.start().isAfter(bigWindow.start())
- && customWindow.end().isBefore(bigWindow.end())) {
- toBeMerged.add(customWindow);
+ Map<CustomWindow, Set<CustomWindow>> windowsToMerge = new HashMap<>();
+ for (CustomWindow window : c.windows()) {
+ if (window.isBig) {
+ HashSet<CustomWindow> windows = new HashSet<>();
+ windows.add(window);
+ windowsToMerge.put(window, windows);
+ }
+ }
+ for (CustomWindow window : c.windows()) {
+ for (Map.Entry<CustomWindow, Set<CustomWindow>> bigWindow : windowsToMerge.entrySet()) {
+ if (bigWindow.getKey().contains(window)) {
+ bigWindow.getValue().add(window);
+ }
}
}
- // in case bigWindow has not been seen yet
- if (bigWindow != null) {
- // merge small windows into big windows
- c.merge(toBeMerged, bigWindow);
+ for (Map.Entry<CustomWindow, Set<CustomWindow>> mergeEntry : windowsToMerge.entrySet()) {
+ c.merge(mergeEntry.getValue(), mergeEntry.getKey());
}
}
@@ -750,8 +750,5 @@ public class WindowTest implements Serializable {
public WindowMappingFn<CustomWindow> getDefaultWindowMappingFn() {
throw new UnsupportedOperationException("side inputs not supported");
}
-
-
}
-
}