You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/12/30 02:02:07 UTC
[beam] branch master updated: BEAM-11536. Test
"beam:window_fn:serialized_java:v1" in WindowStrategyTranslation
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0c84a51 BEAM-11536. Test "beam:window_fn:serialized_java:v1" in WindowStrategyTranslation
new 5e17b69 Merge pull request #13630 from amaliujia/BEAM-11536
0c84a51 is described below
commit 0c84a5140e8b3248911a8620740a31befd4ccb55
Author: amaliujia <am...@163.com>
AuthorDate: Mon Dec 28 23:13:12 2020 -0800
BEAM-11536. Test "beam:window_fn:serialized_java:v1" in WindowStrategyTranslation
---
.../WindowingStrategyTranslationTest.java | 159 ++++++++++++++++++++-
1 file changed, 158 insertions(+), 1 deletion(-)
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index f68ed2e..d5d1a11 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -21,19 +21,37 @@ import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -98,7 +116,8 @@ public class WindowingStrategyTranslationTest {
.withMode(AccumulationMode.RETRACTING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(100))
- .withTimestampCombiner(TimestampCombiner.LATEST)));
+ .withTimestampCombiner(TimestampCombiner.LATEST)),
+ toProtoAndBackSpec(WindowingStrategy.of(new CustomWindowFn())));
}
@Parameter(0)
@@ -143,4 +162,142 @@ public class WindowingStrategyTranslationTest {
proto.getAssignsToOneWindow(),
equalTo(windowingStrategy.getWindowFn().assignsToOneWindow()));
}
+
+ private static class CustomWindow extends IntervalWindow {
+ private boolean isBig;
+
+ CustomWindow(Instant start, Instant end, boolean isBig) {
+ super(start, end);
+ this.isBig = isBig;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CustomWindow that = (CustomWindow) o;
+ return super.equals(o) && this.isBig == that.isBig;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), isBig);
+ }
+ }
+
+ private static class CustomWindowCoder extends CustomCoder<CustomWindow> {
+
+ private static final CustomWindowCoder INSTANCE = new CustomWindowCoder();
+ private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder();
+ private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of();
+
+ public static CustomWindowCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(CustomWindow window, OutputStream outStream) throws IOException {
+ INTERVAL_WINDOW_CODER.encode(window, outStream);
+ VAR_INT_CODER.encode(window.isBig ? 1 : 0, outStream);
+ }
+
+ @Override
+ public CustomWindow decode(InputStream inStream) throws IOException {
+ IntervalWindow superWindow = INTERVAL_WINDOW_CODER.decode(inStream);
+ boolean isBig = VAR_INT_CODER.decode(inStream) != 0;
+ return new CustomWindow(superWindow.start(), superWindow.end(), isBig);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ INTERVAL_WINDOW_CODER.verifyDeterministic();
+ VAR_INT_CODER.verifyDeterministic();
+ }
+ }
+
+ private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> {
+ @Override
+ public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception {
+ String element;
+ // It loses genericity of type T but this is not a big deal for a test.
+ // And it allows to avoid duplicating CustomWindowFn to support PCollection<KV>
+ if (c.element() instanceof KV) {
+ element = ((KV<Integer, String>) c.element()).getValue();
+ } else {
+ element = (String) c.element();
+ }
+ // put big elements in windows of 30s and small ones in windows of 5s
+ if ("big".equals(element)) {
+ return Collections.singletonList(
+ new CustomWindow(
+ c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)), true));
+ } else {
+ return Collections.singletonList(
+ new CustomWindow(
+ c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)), false));
+ }
+ }
+
+ @Override
+ public void mergeWindows(MergeContext c) throws Exception {
+ Map<CustomWindow, Set<CustomWindow>> windowsToMerge = new HashMap<>();
+ for (CustomWindow window : c.windows()) {
+ if (window.isBig) {
+ HashSet<CustomWindow> windows = new HashSet<>();
+ windows.add(window);
+ windowsToMerge.put(window, windows);
+ }
+ }
+ for (CustomWindow window : c.windows()) {
+ for (Map.Entry<CustomWindow, Set<CustomWindow>> bigWindow : windowsToMerge.entrySet()) {
+ if (bigWindow.getKey().contains(window)) {
+ bigWindow.getValue().add(window);
+ }
+ }
+ }
+ for (Map.Entry<CustomWindow, Set<CustomWindow>> mergeEntry : windowsToMerge.entrySet()) {
+ c.merge(mergeEntry.getValue(), mergeEntry.getKey());
+ }
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return other instanceof CustomWindowFn;
+ }
+
+ @Override
+ public Coder<CustomWindow> windowCoder() {
+ return CustomWindowCoder.of();
+ }
+
+ @Override
+ public WindowMappingFn<CustomWindow> getDefaultWindowMappingFn() {
+ throw new UnsupportedOperationException("side inputs not supported");
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CustomWindowFn windowFn = (CustomWindowFn) o;
+ if (this.isCompatible(windowFn)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ // overriding hashCode() is required but it is not useful in terms of
+ // writting test cases.
+ return Objects.hash("test");
+ }
+ }
}