You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/08/10 16:50:10 UTC

[1/2] beam git commit: This closes #3698

Repository: beam
Updated Branches:
  refs/heads/master cd4f5a27c -> 4e01fc1ac


This closes #3698


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e01fc1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e01fc1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e01fc1a

Branch: refs/heads/master
Commit: 4e01fc1acf1df36fb43ccf3f171e5a36147280a1
Parents: cd4f5a2 24ee742
Author: Thomas Groh <tg...@google.com>
Authored: Thu Aug 10 09:49:50 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Aug 10 09:49:50 2017 -0700

----------------------------------------------------------------------
 .../sdk/transforms/windowing/WindowTest.java    | 125 +++++++++----------
 1 file changed, 61 insertions(+), 64 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Update Custom Window Merging Tests

Posted by tg...@apache.org.
Update Custom Window Merging Tests

Update the merge windows to be order-independent.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24ee742d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24ee742d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24ee742d

Branch: refs/heads/master
Commit: 24ee742d5c2cc4082a8eecdc01d7b34d8fc8ef54
Parents: cd4f5a2
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 7 15:05:59 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Aug 10 09:49:50 2017 -0700

----------------------------------------------------------------------
 .../sdk/transforms/windowing/WindowTest.java    | 125 +++++++++----------
 1 file changed, 61 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/24ee742d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 5b6d046..e2f8c26 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -38,8 +38,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
@@ -581,20 +585,23 @@ public class WindowTest implements Serializable {
     assertThat(data, not(hasDisplayItem("trigger")));
     assertThat(data, not(hasDisplayItem("allowedLateness")));
   }
+
   @Test
   @Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
   public void testMergingCustomWindows() {
     Instant startInstant = new Instant(0L);
     List<TimestampedValue<String>> input = new ArrayList<>();
-    input.add(TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))));
-    input.add(TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))));
-    //    This one will be outside of bigWindow thus not merged
-    input.add(TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39))));
-    PCollection<String> inputCollection = pipeline.apply(Create.timestamped(input));
-    PCollection<String> windowedCollection = inputCollection
-        .apply(Window.into(new CustomWindowFn<String>()));
-    PCollection<Long> count = windowedCollection
-        .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
+    PCollection<String> inputCollection =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))),
+                TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))),
+                // This one will be outside of bigWindow thus not merged
+                TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39)))));
+    PCollection<String> windowedCollection =
+        inputCollection.apply(Window.into(new CustomWindowFn<String>()));
+    PCollection<Long> count =
+        windowedCollection.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
     // "small1" and "big" elements merged into bigWindow "small2" not merged
     // because timestamp is not in bigWindow
     PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
@@ -607,61 +614,54 @@ public class WindowTest implements Serializable {
   @Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
   public void testMergingCustomWindowsKeyedCollection() {
     Instant startInstant = new Instant(0L);
-    List<TimestampedValue<KV<Integer, String>>> input = new ArrayList<>();
-    input
-        .add(TimestampedValue.of(KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))));
-    input.add(
-        TimestampedValue.of(KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))));
-    //    This one will be outside of bigWindow thus not merged
-    input.add(
-        TimestampedValue.of(KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39))));
-    PCollection<KV<Integer, String>> inputCollection = pipeline.apply(Create.timestamped(input));
-    PCollection<KV<Integer, String>> windowedCollection = inputCollection
-        .apply(Window.into(new CustomWindowFn<KV<Integer, String>>()));
-    PCollection<Long> count = windowedCollection
-        .apply(Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
+    PCollection<KV<Integer, String>> inputCollection =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of(
+                    KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))),
+                TimestampedValue.of(
+                    KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))),
+                // This element is not contained within the bigWindow and not merged
+                TimestampedValue.of(
+                    KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39)))));
+    PCollection<KV<Integer, String>> windowedCollection =
+        inputCollection.apply(Window.into(new CustomWindowFn<KV<Integer, String>>()));
+    PCollection<Long> count =
+        windowedCollection.apply(
+            Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
     // "small1" and "big" elements merged into bigWindow "small2" not merged
-    // because timestamp is not in bigWindow
+    // because it is not contained in bigWindow
     PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
     pipeline.run();
   }
 
   private static class CustomWindow extends IntervalWindow {
-
     private boolean isBig;
 
-
-    CustomWindow(Instant start, Instant end) {
-      super(start, end);
-      this.isBig = false;
-    }
-
     CustomWindow(Instant start, Instant end, boolean isBig) {
       super(start, end);
       this.isBig = isBig;
     }
 
-    @Override public boolean equals(Object o) {
+    @Override
+    public boolean equals(Object o) {
       if (this == o) {
         return true;
       }
       if (o == null || getClass() != o.getClass()) {
         return false;
       }
-      if (!super.equals(o)) {
-        return false;
-      }
       CustomWindow that = (CustomWindow) o;
-      return isBig == that.isBig;
+      return super.equals(o) && this.isBig == that.isBig;
     }
 
-    @Override public int hashCode() {
+    @Override
+    public int hashCode() {
       return Objects.hash(super.hashCode(), isBig);
     }
   }
 
-  private static class CustomWindowCoder extends
-      CustomCoder<CustomWindow> {
+  private static class CustomWindowCoder extends CustomCoder<CustomWindow> {
 
     private static final CustomWindowCoder INSTANCE = new CustomWindowCoder();
     private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder();
@@ -672,8 +672,7 @@ public class WindowTest implements Serializable {
     }
 
     @Override
-    public void encode(CustomWindow window, OutputStream outStream)
-        throws IOException {
+    public void encode(CustomWindow window, OutputStream outStream) throws IOException {
       INTERVAL_WINDOW_CODER.encode(window, outStream);
       VAR_INT_CODER.encode(window.isBig ? 1 : 0, outStream);
     }
@@ -693,12 +692,12 @@ public class WindowTest implements Serializable {
   }
 
   private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> {
-
-    @Override public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception {
+    @Override
+    public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception {
       String element;
       // It loses genericity of type T but this is not a big deal for a test.
       // And it allows to avoid duplicating CustomWindowFn to support PCollection<KV>
-      if (c.element() instanceof KV){
+      if (c.element() instanceof KV) {
         element = ((KV<Integer, String>) c.element()).getValue();
       } else {
         element = (String) c.element();
@@ -706,33 +705,34 @@ public class WindowTest implements Serializable {
       // put big elements in windows of 30s and small ones in windows of 5s
       if ("big".equals(element)) {
         return Collections.singletonList(
-            new CustomWindow(c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)),
-                true));
+            new CustomWindow(
+                c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)), true));
       } else {
         return Collections.singletonList(
-            new CustomWindow(c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)),
-                false));
+            new CustomWindow(
+                c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)), false));
       }
     }
 
     @Override
     public void mergeWindows(MergeContext c) throws Exception {
-      List<CustomWindow> toBeMerged = new ArrayList<>();
-      CustomWindow bigWindow = null;
-      for (CustomWindow customWindow : c.windows()) {
-        if (customWindow.isBig) {
-          bigWindow = customWindow;
-          toBeMerged.add(customWindow);
-        } else if (bigWindow != null
-            && customWindow.start().isAfter(bigWindow.start())
-            && customWindow.end().isBefore(bigWindow.end())) {
-          toBeMerged.add(customWindow);
+      Map<CustomWindow, Set<CustomWindow>> windowsToMerge = new HashMap<>();
+      for (CustomWindow window : c.windows()) {
+        if (window.isBig) {
+          HashSet<CustomWindow> windows = new HashSet<>();
+          windows.add(window);
+          windowsToMerge.put(window, windows);
+        }
+      }
+      for (CustomWindow window : c.windows()) {
+        for (Map.Entry<CustomWindow, Set<CustomWindow>> bigWindow : windowsToMerge.entrySet()) {
+          if (bigWindow.getKey().contains(window)) {
+            bigWindow.getValue().add(window);
+          }
         }
       }
-      // in case bigWindow has not been seen yet
-      if (bigWindow != null) {
-        // merge small windows into big windows
-        c.merge(toBeMerged, bigWindow);
+      for (Map.Entry<CustomWindow, Set<CustomWindow>> mergeEntry : windowsToMerge.entrySet()) {
+        c.merge(mergeEntry.getValue(), mergeEntry.getKey());
       }
     }
 
@@ -750,8 +750,5 @@ public class WindowTest implements Serializable {
     public WindowMappingFn<CustomWindow> getDefaultWindowMappingFn() {
       throw new UnsupportedOperationException("side inputs not supported");
     }
-
-
   }
-
 }