You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/24 21:24:11 UTC
[4/5] incubator-beam git commit: Use GatherAllPanes in PAssert
Use GatherAllPanes in PAssert
Instead of explicitly grouping by key, gather all the panes across the
input window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f449881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f449881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f449881
Branch: refs/heads/master
Commit: 1f449881a88d9927b507ad46d0004e3c9805513a
Parents: ec1bb3a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 20 14:38:11 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 13:33:13 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/testing/PAssert.java | 369 +++++++++++++++----
.../apache/beam/sdk/testing/PAssertTest.java | 116 ++++++
2 files changed, 418 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f449881/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 1a3d85d..883b2b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.testing;
import static com.google.common.base.Preconditions.checkState;
-
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
@@ -36,18 +35,24 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.GatherAllPanes;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -63,9 +68,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -116,6 +123,14 @@ public class PAssert {
* Builder interface for assertions applicable to iterables and PCollection contents.
*/
public interface IterableAssert<T> {
+ /**
+ * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only
+ * run on the provided window.
+ *
+ * @return a new {@link IterableAssert} like this one but with the assertion only applied to the
+ * specified window.
+ */
+ IterableAssert<T> inWindow(BoundedWindow window);
/**
* Asserts that the iterable in question contains the provided elements.
@@ -152,6 +167,15 @@ public class PAssert {
*/
public interface SingletonAssert<T> {
/**
+ * Creates a new {@link SingletonAssert} like this one, but with the assertion restricted to
+ * only run on the provided window.
+ *
+ * @return a new {@link SingletonAssert} like this one but with the assertion only applied to
+ * the specified window.
+ */
+ SingletonAssert<T> inWindow(BoundedWindow window);
+
+ /**
* Asserts that the value in question is equal to the provided value, according to
* {@link Object#equals}.
*
@@ -250,9 +274,23 @@ public class PAssert {
*/
private static class PCollectionContentsAssert<T> implements IterableAssert<T> {
private final PCollection<T> actual;
+ private final AssertionWindows rewindowingStrategy;
public PCollectionContentsAssert(PCollection<T> actual) {
+ this(actual, IntoGlobalWindow.<T>of());
+ }
+
+ public PCollectionContentsAssert(PCollection<T> actual, AssertionWindows rewindowingStrategy) {
this.actual = actual;
+ this.rewindowingStrategy = rewindowingStrategy;
+ }
+
+ @Override
+ public PCollectionContentsAssert<T> inWindow(BoundedWindow window) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Coder<BoundedWindow> windowCoder =
+ (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
+ return new PCollectionContentsAssert<>(actual, IntoStaticWindows.<T>of(windowCoder, window));
}
/**
@@ -285,7 +323,7 @@ public class PAssert {
@Override
public PCollectionContentsAssert<T> satisfies(
SerializableFunction<Iterable<T>, Void> checkerFn) {
- actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn));
+ actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn, rewindowingStrategy));
return this;
}
@@ -325,7 +363,8 @@ public class PAssert {
@SuppressWarnings({"rawtypes", "unchecked"})
SerializableFunction<Iterable<T>, Void> checkerFn =
(SerializableFunction) new MatcherCheckerFn<>(matcher);
- actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
+ actual.apply(
+ "PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn, rewindowingStrategy));
return this;
}
@@ -374,13 +413,30 @@ public class PAssert {
private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> {
private final PCollection<Iterable<T>> actual;
private final Coder<T> elementCoder;
+ private final AssertionWindows rewindowingStrategy;
public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) {
+ this(actual, IntoGlobalWindow.<Iterable<T>>of());
+ }
+
+ public PCollectionSingletonIterableAssert(
+ PCollection<Iterable<T>> actual, AssertionWindows rewindowingStrategy) {
this.actual = actual;
@SuppressWarnings("unchecked")
Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0);
this.elementCoder = typedCoder;
+
+ this.rewindowingStrategy = rewindowingStrategy;
+ }
+
+ @Override
+ public PCollectionSingletonIterableAssert<T> inWindow(BoundedWindow window) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Coder<BoundedWindow> windowCoder =
+ (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
+ return new PCollectionSingletonIterableAssert<>(
+ actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window));
}
@Override
@@ -402,7 +458,9 @@ public class PAssert {
@Override
public PCollectionSingletonIterableAssert<T> satisfies(
SerializableFunction<Iterable<T>, Void> checkerFn) {
- actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn));
+ actual.apply(
+ "PAssert$" + (assertCount++),
+ new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy));
return this;
}
@@ -421,18 +479,38 @@ public class PAssert {
private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> {
private final PCollection<ElemT> actual;
private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
+ private final AssertionWindows rewindowActuals;
private final Coder<ViewT> coder;
protected PCollectionViewAssert(
PCollection<ElemT> actual,
PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
Coder<ViewT> coder) {
+ this(actual, view, IntoGlobalWindow.<ElemT>of(), coder);
+ }
+
+ private PCollectionViewAssert(
+ PCollection<ElemT> actual,
+ PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
+ AssertionWindows rewindowActuals,
+ Coder<ViewT> coder) {
this.actual = actual;
this.view = view;
+ this.rewindowActuals = rewindowActuals;
this.coder = coder;
}
@Override
+ public PCollectionViewAssert<ElemT, ViewT> inWindow(BoundedWindow window) {
+ return new PCollectionViewAssert<>(
+ actual,
+ view,
+ IntoStaticWindows.of(
+ (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(), window),
+ coder);
+ }
+
+ @Override
public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
}
@@ -449,7 +527,10 @@ public class PAssert {
.getPipeline()
.apply(
"PAssert$" + (assertCount++),
- new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn));
+ new OneSideInputAssert<ViewT>(
+ CreateActual.from(actual, rewindowActuals, view),
+ rewindowActuals.<Integer>windowDummy(),
+ checkerFn));
return this;
}
@@ -496,16 +577,22 @@ public class PAssert {
extends PTransform<PBegin, PCollectionView<ActualT>> {
private final transient PCollection<T> actual;
+ private final transient AssertionWindows rewindowActuals;
private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
public static <T, ActualT> CreateActual<T, ActualT> from(
- PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
- return new CreateActual<>(actual, actualView);
+ PCollection<T> actual,
+ AssertionWindows rewindowActuals,
+ PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+ return new CreateActual<>(actual, rewindowActuals, actualView);
}
private CreateActual(
- PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+ PCollection<T> actual,
+ AssertionWindows rewindowActuals,
+ PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
this.actual = actual;
+ this.rewindowActuals = rewindowActuals;
this.actualView = actualView;
}
@@ -513,7 +600,8 @@ public class PAssert {
public PCollectionView<ActualT> apply(PBegin input) {
final Coder<T> coder = actual.getCoder();
return actual
- .apply(Window.<T>into(new GlobalWindows()))
+ .apply("FilterActuals", rewindowActuals.<T>prepareActuals())
+ .apply("RewindowActuals", rewindowActuals.<T>windowActuals())
.apply(
ParDo.of(
new DoFn<T, T>() {
@@ -565,84 +653,83 @@ public class PAssert {
* <p>If the {@link PCollection} is empty, this transform returns a {@link PCollection} containing
* a single empty iterable, even though in practice most runners will not produce any element.
*/
- private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>>
+ private static class GroupGlobally<T>
+ extends PTransform<PCollection<T>, PCollection<Iterable<WindowedValue<T>>>>
implements Serializable {
+ private final AssertionWindows rewindowingStrategy;
- public GroupGlobally() {}
+ public GroupGlobally(AssertionWindows rewindowingStrategy) {
+ this.rewindowingStrategy = rewindowingStrategy;
+ }
@Override
- public PCollection<Iterable<T>> apply(PCollection<T> input) {
-
- final int contentsKey = 0;
- final int dummyKey = 1;
+ public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) {
final int combinedKey = 42;
+ // Remove the triggering on both
+ PTransform<
+ PCollection<KV<Integer, Iterable<WindowedValue<T>>>>,
+ PCollection<KV<Integer, Iterable<WindowedValue<T>>>>>
+ removeTriggering =
+ Window.<KV<Integer, Iterable<WindowedValue<T>>>>triggering(Never.ever())
+ .discardingFiredPanes()
+ .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());
// Group the contents by key. If it is empty, this PCollection will be empty, too.
// Then key it again with a dummy key.
- PCollection<KV<Integer, KV<Integer, Iterable<T>>>> doubleKeyedGroupedInput =
+ PCollection<KV<Integer, Iterable<WindowedValue<T>>>> groupedContents =
+ // TODO: Split the filtering from the rewindowing, and apply filtering before the Gather
+ // if the grouping of extra records
input
- .apply("GloballyWindow", Window.<T>into(new GlobalWindows()))
- .apply("ContentsWithKeys", WithKeys.<Integer, T>of(contentsKey))
+ .apply(rewindowingStrategy.<T>prepareActuals())
+ .apply("GatherAllOutputs", GatherAllPanes.<T>globally())
.apply(
- "NeverTriggerContents",
- Window.<KV<Integer, T>>triggering(Never.ever()).discardingFiredPanes())
- .apply("ContentsGBK", GroupByKey.<Integer, T>create())
- .apply(
- "DoubleKeyContents", WithKeys.<Integer, KV<Integer, Iterable<T>>>of(combinedKey));
+ "RewindowActuals",
+ rewindowingStrategy.<Iterable<WindowedValue<T>>>windowActuals())
+ .apply("KeyForDummy", WithKeys.<Integer, Iterable<WindowedValue<T>>>of(combinedKey))
+ .apply("RemoveActualsTriggering", removeTriggering);
// Create another non-empty PCollection that is keyed with a distinct dummy key
- PCollection<KV<Integer, KV<Integer, Iterable<T>>>> doubleKeyedDummy =
+ PCollection<KV<Integer, Iterable<WindowedValue<T>>>> keyedDummy =
input
.getPipeline()
.apply(
Create.of(
KV.of(
combinedKey,
- KV.of(dummyKey, (Iterable<T>) Collections.<T>emptyList())))
- .withCoder(doubleKeyedGroupedInput.getCoder()))
- .setWindowingStrategyInternal(doubleKeyedGroupedInput.getWindowingStrategy());
+ (Iterable<WindowedValue<T>>)
+ Collections.<WindowedValue<T>>emptyList()))
+ .withCoder(groupedContents.getCoder()))
+ .apply(
+ "WindowIntoDummy",
+ rewindowingStrategy.<KV<Integer, Iterable<WindowedValue<T>>>>windowDummy())
+ .apply("RemoveDummyTriggering", removeTriggering);
// Flatten them together and group by the combined key to get a single element
- PCollection<KV<Integer, Iterable<KV<Integer, Iterable<T>>>>> dummyAndContents =
- PCollectionList.<KV<Integer, KV<Integer, Iterable<T>>>>of(doubleKeyedGroupedInput)
- .and(doubleKeyedDummy)
+ PCollection<KV<Integer, Iterable<Iterable<WindowedValue<T>>>>> dummyAndContents =
+ PCollectionList.of(groupedContents)
+ .and(keyedDummy)
.apply(
"FlattenDummyAndContents",
- Flatten.<KV<Integer, KV<Integer, Iterable<T>>>>pCollections())
+ Flatten.<KV<Integer, Iterable<WindowedValue<T>>>>pCollections())
.apply(
- "GroupDummyAndContents", GroupByKey.<Integer, KV<Integer, Iterable<T>>>create());
+ "NeverTrigger",
+ Window.<KV<Integer, Iterable<WindowedValue<T>>>>triggering(Never.ever())
+ .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+ .discardingFiredPanes())
+ .apply(
+ "GroupDummyAndContents",
+ GroupByKey.<Integer, Iterable<WindowedValue<T>>>create());
- // Extract the contents if they exist else empty contents.
return dummyAndContents
- .apply(
- "GetContents",
- ParDo.of(
- new DoFn<KV<Integer, Iterable<KV<Integer, Iterable<T>>>>, Iterable<T>>() {
- @Override
- public void processElement(ProcessContext ctx) {
- Iterable<KV<Integer, Iterable<T>>> groupedDummyAndContents =
- ctx.element().getValue();
-
- if (Iterables.size(groupedDummyAndContents) == 1) {
- // Only the dummy value, so just output empty
- ctx.output(Collections.<T>emptyList());
- } else {
- checkState(
- Iterables.size(groupedDummyAndContents) == 2,
- "Internal error: PAssert grouped contents with a"
- + " dummy value resulted in more than 2 groupings: %s",
- groupedDummyAndContents);
-
- if (Iterables.get(groupedDummyAndContents, 0).getKey() == contentsKey) {
- // The first iterable in the group holds the real contents
- ctx.output(Iterables.get(groupedDummyAndContents, 0).getValue());
- } else {
- // The second iterable holds the real contents
- ctx.output(Iterables.get(groupedDummyAndContents, 1).getValue());
- }
- }
- }
- }));
+ .apply(Values.<Iterable<Iterable<WindowedValue<T>>>>create())
+ .apply(ParDo.of(new ConcatFn<WindowedValue<T>>()));
+ }
+ }
+
+ private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(Iterables.concat(c.element()));
}
}
@@ -653,15 +740,20 @@ public class PAssert {
public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone>
implements Serializable {
private final SerializableFunction<Iterable<T>, Void> checkerFn;
+ private final AssertionWindows rewindowingStrategy;
- private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) {
+ private GroupThenAssert(
+ SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy) {
this.checkerFn = checkerFn;
+ this.rewindowingStrategy = rewindowingStrategy;
}
@Override
public PDone apply(PCollection<T> input) {
input
- .apply("GroupGlobally", new GroupGlobally<T>())
+ .apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
+ .apply("GetOnlyPane", ParDo.of(new ExtractOnlyPane<T>()))
+ .setCoder(IterableCoder.of(input.getCoder()))
.apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
return PDone.in(input.getPipeline());
@@ -675,15 +767,20 @@ public class PAssert {
public static class GroupThenAssertForSingleton<T>
extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable {
private final SerializableFunction<Iterable<T>, Void> checkerFn;
+ private final AssertionWindows rewindowingStrategy;
- private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) {
+ private GroupThenAssertForSingleton(
+ SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy) {
this.checkerFn = checkerFn;
+ this.rewindowingStrategy = rewindowingStrategy;
}
@Override
public PDone apply(PCollection<Iterable<T>> input) {
input
- .apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
+ .apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy))
+ .apply("GetOnlyPane", ParDo.of(new ExtractOnlyPane<Iterable<T>>()))
+ .setCoder(IterableCoder.of(input.getCoder()))
.apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
return PDone.in(input.getPipeline());
@@ -703,12 +800,15 @@ public class PAssert {
public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone>
implements Serializable {
private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
+ private final transient PTransform<PCollection<Integer>, PCollection<Integer>> windowToken;
private final SerializableFunction<ActualT, Void> checkerFn;
private OneSideInputAssert(
PTransform<PBegin, PCollectionView<ActualT>> createActual,
+ PTransform<PCollection<Integer>, PCollection<Integer>> windowToken,
SerializableFunction<ActualT, Void> checkerFn) {
this.createActual = createActual;
+ this.windowToken = windowToken;
this.checkerFn = checkerFn;
}
@@ -718,7 +818,9 @@ public class PAssert {
input
.apply(Create.of(0).withCoder(VarIntCoder.of()))
- .apply("RunChecks",
+ .apply("WindowToken", windowToken)
+ .apply(
+ "RunChecks",
ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual)));
return PDone.in(input.getPipeline());
@@ -760,6 +862,23 @@ public class PAssert {
}
}
+ private static class ExtractOnlyPane<T> extends DoFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ List<T> outputs = new ArrayList<>();
+ for (WindowedValue<T> value : c.element()) {
+ checkState(
+ value.getPane().isFirst() && value.getPane().isLast(),
+ "Expected elements to be produced by a trigger that fires at most once, but got"
+ + "a value in a pane that is %s. Actual Pane Info: %s",
+ value.getPane().isFirst() ? "not the last pane" : "not the first pane",
+ value.getPane());
+ outputs.add(value.getValue());
+ }
+ c.output(outputs);
+ }
+ }
+
/**
* A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
* the single iterable element of the input {@link PCollection} and adjusts counters and
@@ -948,4 +1067,120 @@ public class PAssert {
return new AssertContainsInAnyOrder<T>(expectedElements);
}
}
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * A strategy for filtering and rewindowing the actual and dummy {@link PCollection PCollections}
+ * within a {@link PAssert}.
+ *
+ * <p>This must ensure that the windowing strategies of the output of {@link #windowActuals()} and
+ * {@link #windowDummy()} are compatible (and can be {@link Flatten Flattened}).
+ *
+ * <p>The {@link PCollection} produced by {@link #prepareActuals()} will be a parent (though not
+ * a direct parent) of the transform provided to {@link #windowActuals()}.
+ */
+ private interface AssertionWindows {
+ /**
+ * Returns a transform that assigns the dummy element into the appropriate
+ * {@link BoundedWindow windows}.
+ */
+ <T> PTransform<PCollection<T>, PCollection<T>> windowDummy();
+
+ /**
+ * Returns a transform that filters and reassigns windows of the actual elements if necessary.
+ */
+ <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals();
+
+ /**
+ * Returns a transform that assigns the actual elements into the appropriate
+ * {@link BoundedWindow windows}. Will be called after {@link #prepareActuals()}.
+ */
+ <T> PTransform<PCollection<T>, PCollection<T>> windowActuals();
+ }
+
+ /**
+ * An {@link AssertionWindows} which assigns all elements to the {@link GlobalWindow}.
+ */
+ private static class IntoGlobalWindow implements AssertionWindows, Serializable {
+ public static AssertionWindows of() {
+ return new IntoGlobalWindow();
+ }
+
+ private <T> PTransform<PCollection<T>, PCollection<T>> window() {
+ return Window.into(new GlobalWindows());
+ }
+
+ @Override
+ public <T> PTransform<PCollection<T>, PCollection<T>> windowDummy() {
+ return window();
+ }
+
+ /**
+ * Rewindows all input elements into the {@link GlobalWindow}. This ensures that the result
+ * PCollection will contain all of the elements of the PCollection when the window is not
+ * specified.
+ */
+ @Override
+ public <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals() {
+ return window();
+ }
+
+ @Override
+ public <T> PTransform<PCollection<T>, PCollection<T>> windowActuals() {
+ return window();
+ }
+ }
+
+ private static class IntoStaticWindows implements AssertionWindows {
+ private final StaticWindows windowFn;
+
+ public static AssertionWindows of(Coder<BoundedWindow> windowCoder, BoundedWindow window) {
+ return new IntoStaticWindows(StaticWindows.of(windowCoder, window));
+ }
+
+ private IntoStaticWindows(StaticWindows windowFn) {
+ this.windowFn = windowFn;
+ }
+
+ @Override
+ public <T> PTransform<PCollection<T>, PCollection<T>> windowDummy() {
+ return Window.into(windowFn);
+ }
+
+ @Override
+ public <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals() {
+ return new FilterWindows<>(windowFn);
+ }
+
+ @Override
+ public <T> PTransform<PCollection<T>, PCollection<T>> windowActuals() {
+ return Window.into(windowFn.intoOnlyExisting());
+ }
+ }
+
+ /**
+ * A DoFn that filters elements based on their presence in a static collection of windows.
+ */
+ private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ private final StaticWindows windows;
+
+ public FilterWindows(StaticWindows windows) {
+ this.windows = windows;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ return input.apply("FilterWindows", ParDo.of(new Fn()));
+ }
+
+ private class Fn extends DoFn<T, T> implements RequiresWindowAccess {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ if (windows.getWindows().contains(c.window())) {
+ c.output(c.element());
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f449881/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index fdc8719..bafd897 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.testing;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -25,11 +27,21 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.collect.Iterables;
import com.fasterxml.jackson.annotation.JsonCreator;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -148,6 +160,45 @@ public class PAssertTest implements Serializable {
}
/**
+ * A {@link PAssert} about the contents of a {@link PCollection}
+ * is allows to be verified by an arbitrary {@link SerializableFunction},
+ * though.
+ */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testWindowedSerializablePredicate() throws Exception {
+ Pipeline pipeline = TestPipeline.create();
+
+ PCollection<NotSerializableObject> pcollection = pipeline
+ .apply(Create.timestamped(
+ TimestampedValue.of(new NotSerializableObject(), new Instant(250L)),
+ TimestampedValue.of(new NotSerializableObject(), new Instant(500L)))
+ .withCoder(NotSerializableObjectCoder.of()))
+ .apply(Window.<NotSerializableObject>into(FixedWindows.of(Duration.millis(300L))));
+
+ PAssert.that(pcollection)
+ .inWindow(new IntervalWindow(new Instant(0L), new Instant(300L)))
+ .satisfies(new SerializableFunction<Iterable<NotSerializableObject>, Void>() {
+ @Override
+ public Void apply(Iterable<NotSerializableObject> contents) {
+ assertThat(Iterables.isEmpty(contents), is(false));
+ return null; // no problem!
+ }
+ });
+ PAssert.that(pcollection)
+ .inWindow(new IntervalWindow(new Instant(300L), new Instant(600L)))
+ .satisfies(new SerializableFunction<Iterable<NotSerializableObject>, Void>() {
+ @Override
+ public Void apply(Iterable<NotSerializableObject> contents) {
+ assertThat(Iterables.isEmpty(contents), is(false));
+ return null; // no problem!
+ }
+ });
+
+ pipeline.run();
+ }
+
+ /**
* Test that we throw an error at pipeline construction time when the user mistakenly uses
* {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}.
*/
@@ -220,6 +271,26 @@ public class PAssertTest implements Serializable {
}
/**
+ * Basic test for {@code isEqualTo}.
+ */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testWindowedIsEqualTo() throws Exception {
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<Integer> pcollection =
+ pipeline.apply(Create.timestamped(TimestampedValue.of(43, new Instant(250L)),
+ TimestampedValue.of(22, new Instant(-250L))))
+ .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(500L))));
+ PAssert.thatSingleton(pcollection)
+ .inWindow(new IntervalWindow(new Instant(0L), new Instant(500L)))
+ .isEqualTo(43);
+ PAssert.thatSingleton(pcollection)
+ .inWindow(new IntervalWindow(new Instant(-500L), new Instant(0L)))
+ .isEqualTo(22);
+ pipeline.run();
+ }
+
+ /**
* Basic test for {@code notEqualTo}.
*/
@Test
@@ -244,6 +315,51 @@ public class PAssertTest implements Serializable {
}
/**
+ * Tests that {@code containsInAnyOrder} is actually order-independent.
+ */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testGlobalWindowContainsInAnyOrder() throws Exception {
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
+ PAssert.that(pcollection).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(2, 1, 4, 3);
+ pipeline.run();
+ }
+
+ /**
+ * Tests that windowed {@code containsInAnyOrder} is actually order-independent.
+ */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testWindowedContainsInAnyOrder() throws Exception {
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<Integer> pcollection =
+ pipeline.apply(Create.timestamped(TimestampedValue.of(1, new Instant(100L)),
+ TimestampedValue.of(2, new Instant(200L)),
+ TimestampedValue.of(3, new Instant(300L)),
+ TimestampedValue.of(4, new Instant(400L))))
+ .apply(Window.<Integer>into(SlidingWindows.of(Duration.millis(200L))
+ .every(Duration.millis(100L))
+ .withOffset(Duration.millis(50L))));
+
+ PAssert.that(pcollection)
+ .inWindow(new IntervalWindow(new Instant(-50L), new Instant(150L))).containsInAnyOrder(1);
+ PAssert.that(pcollection)
+ .inWindow(new IntervalWindow(new Instant(50L), new Instant(250L)))
+ .containsInAnyOrder(2, 1);
+ PAssert.that(pcollection)
+ .inWindow(new IntervalWindow(new Instant(150L), new Instant(350L)))
+ .containsInAnyOrder(2, 3);
+ PAssert.that(pcollection)
+ .inWindow(new IntervalWindow(new Instant(250L), new Instant(450L)))
+ .containsInAnyOrder(4, 3);
+ PAssert.that(pcollection)
+ .inWindow(new IntervalWindow(new Instant(350L), new Instant(550L)))
+ .containsInAnyOrder(4);
+ pipeline.run();
+ }
+
+ /**
* Tests that {@code containsInAnyOrder} fails when and how it should.
*/
@Test