You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/19 16:21:06 UTC

[2/4] incubator-beam git commit: Add inEarlyPanesInGlobalWindow as a PAssert Extractor

Add inEarlyPanesInGlobalWindow as a PAssert Extractor

This is for use in asserting the contents of speculative panes in the
global window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f15fab8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f15fab8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f15fab8c

Branch: refs/heads/master
Commit: f15fab8ccdb3b40004583e8f7e4e32a0b8ba5121
Parents: bfa3b70
Author: Thomas Groh <tg...@google.com>
Authored: Thu Aug 11 15:46:10 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:18 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/testing/PAssert.java | 18 ++++++++++++++++++
 .../apache/beam/sdk/testing/PaneExtractors.java   | 18 ++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index e07ee3d..3f1a741 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -176,6 +177,13 @@ public class PAssert {
     IterableAssert<T> inCombinedNonLatePanes(BoundedWindow window);
 
     /**
+     * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only
+     * run on panes in the {@link GlobalWindow} that were emitted before the {@link GlobalWindow}
+     * closed. These panes have {@link Timing#EARLY}.
+     */
+    IterableAssert<T> inEarlyGlobalWindowPanes();
+
+    /**
      * Asserts that the iterable in question contains the provided elements.
      *
      * @return the same {@link IterableAssert} builder for further assertions
@@ -381,6 +389,11 @@ public class PAssert {
       return withPane(window, PaneExtractors.<T>nonLatePanes());
     }
 
+    @Override
+    public IterableAssert<T> inEarlyGlobalWindowPanes() {
+      return withPane(GlobalWindow.INSTANCE, PaneExtractors.<T>earlyPanes());
+    }
+
     private PCollectionContentsAssert<T> withPane(
         BoundedWindow window,
         SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) {
@@ -557,6 +570,11 @@ public class PAssert {
       return withPanes(window, PaneExtractors.<Iterable<T>>nonLatePanes());
     }
 
+    @Override
+    public IterableAssert<T> inEarlyGlobalWindowPanes() {
+      return withPanes(GlobalWindow.INSTANCE, PaneExtractors.<Iterable<T>>earlyPanes());
+    }
+
     private PCollectionSingletonIterableAssert<T> withPanes(
         BoundedWindow window,
         SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
index f699bfc..899612b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
@@ -59,6 +59,10 @@ final class PaneExtractors {
     return new ExtractNonLatePanes<>();
   }
 
+  static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> earlyPanes() {
+    return new ExtractEarlyPanes<>();
+  }
+
   static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> allPanes() {
     return new ExtractAllPanes<>();
   }
@@ -137,4 +141,18 @@ final class PaneExtractors {
       return outputs;
     }
   }
+
+  private static class ExtractEarlyPanes<T>
+      extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+    @Override
+    public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+      List<T> outputs = new ArrayList<>();
+      for (WindowedValue<T> value : input) {
+        if (value.getPane().getTiming() == PaneInfo.Timing.EARLY) {
+          outputs.add(value.getValue());
+        }
+      }
+      return outputs;
+    }
+  }
 }