You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/19 16:21:06 UTC
[2/4] incubator-beam git commit: Add inEarlyPanesInGlobalWindow as a
PAssert Extractor
Add inEarlyPanesInGlobalWindow as a PAssert Extractor
This is for use in asserting the contents of speculative panes in the
global window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f15fab8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f15fab8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f15fab8c
Branch: refs/heads/master
Commit: f15fab8ccdb3b40004583e8f7e4e32a0b8ba5121
Parents: bfa3b70
Author: Thomas Groh <tg...@google.com>
Authored: Thu Aug 11 15:46:10 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:18 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/testing/PAssert.java | 18 ++++++++++++++++++
.../apache/beam/sdk/testing/PaneExtractors.java | 18 ++++++++++++++++++
2 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index e07ee3d..3f1a741 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -176,6 +177,13 @@ public class PAssert {
IterableAssert<T> inCombinedNonLatePanes(BoundedWindow window);
/**
+ * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only
+ * run on panes in the {@link GlobalWindow} that were emitted before the {@link GlobalWindow}
+ * closed. These panes have {@link Timing#EARLY}.
+ */
+ IterableAssert<T> inEarlyGlobalWindowPanes();
+
+ /**
* Asserts that the iterable in question contains the provided elements.
*
* @return the same {@link IterableAssert} builder for further assertions
@@ -381,6 +389,11 @@ public class PAssert {
return withPane(window, PaneExtractors.<T>nonLatePanes());
}
+ @Override
+ public IterableAssert<T> inEarlyGlobalWindowPanes() {
+ return withPane(GlobalWindow.INSTANCE, PaneExtractors.<T>earlyPanes());
+ }
+
private PCollectionContentsAssert<T> withPane(
BoundedWindow window,
SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) {
@@ -557,6 +570,11 @@ public class PAssert {
return withPanes(window, PaneExtractors.<Iterable<T>>nonLatePanes());
}
+ @Override
+ public IterableAssert<T> inEarlyGlobalWindowPanes() {
+ return withPanes(GlobalWindow.INSTANCE, PaneExtractors.<Iterable<T>>earlyPanes());
+ }
+
private PCollectionSingletonIterableAssert<T> withPanes(
BoundedWindow window,
SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
index f699bfc..899612b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
@@ -59,6 +59,10 @@ final class PaneExtractors {
return new ExtractNonLatePanes<>();
}
+ static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> earlyPanes() {
+ return new ExtractEarlyPanes<>();
+ }
+
static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> allPanes() {
return new ExtractAllPanes<>();
}
@@ -137,4 +141,18 @@ final class PaneExtractors {
return outputs;
}
}
+
+ private static class ExtractEarlyPanes<T>
+ extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+ @Override
+ public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+ List<T> outputs = new ArrayList<>();
+ for (WindowedValue<T> value : input) {
+ if (value.getPane().getTiming() == PaneInfo.Timing.EARLY) {
+ outputs.add(value.getValue());
+ }
+ }
+ return outputs;
+ }
+ }
}