You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/12/30 02:02:07 UTC

[beam] branch master updated: BEAM-11536. Test "beam:window_fn:serialized_java:v1" in WindowStrategyTranslation

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c84a51  BEAM-11536. Test "beam:window_fn:serialized_java:v1" in WindowStrategyTranslation
     new 5e17b69  Merge pull request #13630 from amaliujia/BEAM-11536
0c84a51 is described below

commit 0c84a5140e8b3248911a8620740a31befd4ccb55
Author: amaliujia <am...@163.com>
AuthorDate: Mon Dec 28 23:13:12 2020 -0800

    BEAM-11536. Test "beam:window_fn:serialized_java:v1" in WindowStrategyTranslation
---
 .../WindowingStrategyTranslationTest.java          | 159 ++++++++++++++++++++-
 1 file changed, 158 insertions(+), 1 deletion(-)

diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index f68ed2e..d5d1a11 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -21,19 +21,37 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -98,7 +116,8 @@ public class WindowingStrategyTranslationTest {
                 .withMode(AccumulationMode.RETRACTING_FIRED_PANES)
                 .withTrigger(REPRESENTATIVE_TRIGGER)
                 .withAllowedLateness(Duration.millis(100))
-                .withTimestampCombiner(TimestampCombiner.LATEST)));
+                .withTimestampCombiner(TimestampCombiner.LATEST)),
+        toProtoAndBackSpec(WindowingStrategy.of(new CustomWindowFn())));
   }
 
   @Parameter(0)
@@ -143,4 +162,142 @@ public class WindowingStrategyTranslationTest {
         proto.getAssignsToOneWindow(),
         equalTo(windowingStrategy.getWindowFn().assignsToOneWindow()));
   }
+
+  private static class CustomWindow extends IntervalWindow {
+    private boolean isBig;
+
+    CustomWindow(Instant start, Instant end, boolean isBig) {
+      super(start, end);
+      this.isBig = isBig;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CustomWindow that = (CustomWindow) o;
+      return super.equals(o) && this.isBig == that.isBig;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), isBig);
+    }
+  }
+
+  private static class CustomWindowCoder extends CustomCoder<CustomWindow> {
+
+    private static final CustomWindowCoder INSTANCE = new CustomWindowCoder();
+    private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder();
+    private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of();
+
+    public static CustomWindowCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(CustomWindow window, OutputStream outStream) throws IOException {
+      INTERVAL_WINDOW_CODER.encode(window, outStream);
+      VAR_INT_CODER.encode(window.isBig ? 1 : 0, outStream);
+    }
+
+    @Override
+    public CustomWindow decode(InputStream inStream) throws IOException {
+      IntervalWindow superWindow = INTERVAL_WINDOW_CODER.decode(inStream);
+      boolean isBig = VAR_INT_CODER.decode(inStream) != 0;
+      return new CustomWindow(superWindow.start(), superWindow.end(), isBig);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      INTERVAL_WINDOW_CODER.verifyDeterministic();
+      VAR_INT_CODER.verifyDeterministic();
+    }
+  }
+
+  private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> {
+    @Override
+    public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception {
+      String element;
+      // It loses genericity of type T but this is not a big deal for a test.
+      // And it allows to avoid duplicating CustomWindowFn to support PCollection<KV>
+      if (c.element() instanceof KV) {
+        element = ((KV<Integer, String>) c.element()).getValue();
+      } else {
+        element = (String) c.element();
+      }
+      // put big elements in windows of 30s and small ones in windows of 5s
+      if ("big".equals(element)) {
+        return Collections.singletonList(
+            new CustomWindow(
+                c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)), true));
+      } else {
+        return Collections.singletonList(
+            new CustomWindow(
+                c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)), false));
+      }
+    }
+
+    @Override
+    public void mergeWindows(MergeContext c) throws Exception {
+      Map<CustomWindow, Set<CustomWindow>> windowsToMerge = new HashMap<>();
+      for (CustomWindow window : c.windows()) {
+        if (window.isBig) {
+          HashSet<CustomWindow> windows = new HashSet<>();
+          windows.add(window);
+          windowsToMerge.put(window, windows);
+        }
+      }
+      for (CustomWindow window : c.windows()) {
+        for (Map.Entry<CustomWindow, Set<CustomWindow>> bigWindow : windowsToMerge.entrySet()) {
+          if (bigWindow.getKey().contains(window)) {
+            bigWindow.getValue().add(window);
+          }
+        }
+      }
+      for (Map.Entry<CustomWindow, Set<CustomWindow>> mergeEntry : windowsToMerge.entrySet()) {
+        c.merge(mergeEntry.getValue(), mergeEntry.getKey());
+      }
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> other) {
+      return other instanceof CustomWindowFn;
+    }
+
+    @Override
+    public Coder<CustomWindow> windowCoder() {
+      return CustomWindowCoder.of();
+    }
+
+    @Override
+    public WindowMappingFn<CustomWindow> getDefaultWindowMappingFn() {
+      throw new UnsupportedOperationException("side inputs not supported");
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      CustomWindowFn windowFn = (CustomWindowFn) o;
+      if (this.isCompatible(windowFn)) {
+        return true;
+      }
+
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      // overriding hashCode() is required but it is not useful in terms of
+      // writting test cases.
+      return Objects.hash("test");
+    }
+  }
 }