You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/24 21:24:11 UTC

[4/5] incubator-beam git commit: Use GatherAllPanes in PAssert

Use GatherAllPanes in PAssert

Instead of explicitly grouping by key, gather all the panes across the
input window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f449881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f449881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f449881

Branch: refs/heads/master
Commit: 1f449881a88d9927b507ad46d0004e3c9805513a
Parents: ec1bb3a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 20 14:38:11 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 13:33:13 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 369 +++++++++++++++----
 .../apache/beam/sdk/testing/PAssertTest.java    | 116 ++++++
 2 files changed, 418 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f449881/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 1a3d85d..883b2b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.testing;
 
 import static com.google.common.base.Preconditions.checkState;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
@@ -36,18 +35,24 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.GatherAllPanes;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -63,9 +68,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
@@ -116,6 +123,14 @@ public class PAssert {
    * Builder interface for assertions applicable to iterables and PCollection contents.
    */
   public interface IterableAssert<T> {
+    /**
+     * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only
+     * run on the provided window.
+     *
+     * @return a new {@link IterableAssert} like this one but with the assertion only applied to the
+     * specified window.
+     */
+    IterableAssert<T> inWindow(BoundedWindow window);
 
     /**
      * Asserts that the iterable in question contains the provided elements.
@@ -152,6 +167,15 @@ public class PAssert {
    */
   public interface SingletonAssert<T> {
     /**
+     * Creates a new {@link SingletonAssert} like this one, but with the assertion restricted to
+     * only run on the provided window.
+     *
+     * @return a new {@link SingletonAssert} like this one but with the assertion only applied to
+     * the specified window.
+     */
+    SingletonAssert<T> inWindow(BoundedWindow window);
+
+    /**
      * Asserts that the value in question is equal to the provided value, according to
      * {@link Object#equals}.
      *
@@ -250,9 +274,23 @@ public class PAssert {
    */
   private static class PCollectionContentsAssert<T> implements IterableAssert<T> {
     private final PCollection<T> actual;
+    private final AssertionWindows rewindowingStrategy;
 
     public PCollectionContentsAssert(PCollection<T> actual) {
+      this(actual, IntoGlobalWindow.<T>of());
+    }
+
+    public PCollectionContentsAssert(PCollection<T> actual, AssertionWindows rewindowingStrategy) {
       this.actual = actual;
+      this.rewindowingStrategy = rewindowingStrategy;
+    }
+
+    @Override
+    public PCollectionContentsAssert<T> inWindow(BoundedWindow window) {
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder<BoundedWindow> windowCoder =
+          (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
+      return new PCollectionContentsAssert<>(actual, IntoStaticWindows.<T>of(windowCoder, window));
     }
 
     /**
@@ -285,7 +323,7 @@ public class PAssert {
     @Override
     public PCollectionContentsAssert<T> satisfies(
         SerializableFunction<Iterable<T>, Void> checkerFn) {
-      actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn));
+      actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn, rewindowingStrategy));
       return this;
     }
 
@@ -325,7 +363,8 @@ public class PAssert {
       @SuppressWarnings({"rawtypes", "unchecked"})
       SerializableFunction<Iterable<T>, Void> checkerFn =
           (SerializableFunction) new MatcherCheckerFn<>(matcher);
-      actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
+      actual.apply(
+          "PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn, rewindowingStrategy));
       return this;
     }
 
@@ -374,13 +413,30 @@ public class PAssert {
   private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> {
     private final PCollection<Iterable<T>> actual;
     private final Coder<T> elementCoder;
+    private final AssertionWindows rewindowingStrategy;
 
     public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) {
+      this(actual, IntoGlobalWindow.<Iterable<T>>of());
+    }
+
+    public PCollectionSingletonIterableAssert(
+        PCollection<Iterable<T>> actual, AssertionWindows rewindowingStrategy) {
       this.actual = actual;
 
       @SuppressWarnings("unchecked")
       Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0);
       this.elementCoder = typedCoder;
+
+      this.rewindowingStrategy = rewindowingStrategy;
+    }
+
+    @Override
+    public PCollectionSingletonIterableAssert<T> inWindow(BoundedWindow window) {
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder<BoundedWindow> windowCoder =
+          (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
+      return new PCollectionSingletonIterableAssert<>(
+          actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window));
     }
 
     @Override
@@ -402,7 +458,9 @@ public class PAssert {
     @Override
     public PCollectionSingletonIterableAssert<T> satisfies(
         SerializableFunction<Iterable<T>, Void> checkerFn) {
-      actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn));
+      actual.apply(
+          "PAssert$" + (assertCount++),
+          new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy));
       return this;
     }
 
@@ -421,18 +479,38 @@ public class PAssert {
   private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> {
     private final PCollection<ElemT> actual;
     private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
+    private final AssertionWindows rewindowActuals;
     private final Coder<ViewT> coder;
 
     protected PCollectionViewAssert(
         PCollection<ElemT> actual,
         PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
         Coder<ViewT> coder) {
+      this(actual, view, IntoGlobalWindow.<ElemT>of(), coder);
+    }
+
+    private PCollectionViewAssert(
+        PCollection<ElemT> actual,
+        PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
+        AssertionWindows rewindowActuals,
+        Coder<ViewT> coder) {
       this.actual = actual;
       this.view = view;
+      this.rewindowActuals = rewindowActuals;
       this.coder = coder;
     }
 
     @Override
+    public PCollectionViewAssert<ElemT, ViewT> inWindow(BoundedWindow window) {
+      return new PCollectionViewAssert<>(
+          actual,
+          view,
+          IntoStaticWindows.of(
+              (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(), window),
+          coder);
+    }
+
+    @Override
     public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
       return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
     }
@@ -449,7 +527,10 @@ public class PAssert {
           .getPipeline()
           .apply(
               "PAssert$" + (assertCount++),
-              new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn));
+              new OneSideInputAssert<ViewT>(
+                  CreateActual.from(actual, rewindowActuals, view),
+                  rewindowActuals.<Integer>windowDummy(),
+                  checkerFn));
       return this;
     }
 
@@ -496,16 +577,22 @@ public class PAssert {
       extends PTransform<PBegin, PCollectionView<ActualT>> {
 
     private final transient PCollection<T> actual;
+    private final transient AssertionWindows rewindowActuals;
     private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
 
     public static <T, ActualT> CreateActual<T, ActualT> from(
-        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
-      return new CreateActual<>(actual, actualView);
+        PCollection<T> actual,
+        AssertionWindows rewindowActuals,
+        PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+      return new CreateActual<>(actual, rewindowActuals, actualView);
     }
 
     private CreateActual(
-        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+        PCollection<T> actual,
+        AssertionWindows rewindowActuals,
+        PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
       this.actual = actual;
+      this.rewindowActuals = rewindowActuals;
       this.actualView = actualView;
     }
 
@@ -513,7 +600,8 @@ public class PAssert {
     public PCollectionView<ActualT> apply(PBegin input) {
       final Coder<T> coder = actual.getCoder();
       return actual
-          .apply(Window.<T>into(new GlobalWindows()))
+          .apply("FilterActuals", rewindowActuals.<T>prepareActuals())
+          .apply("RewindowActuals", rewindowActuals.<T>windowActuals())
           .apply(
               ParDo.of(
                   new DoFn<T, T>() {
@@ -565,84 +653,83 @@ public class PAssert {
    * <p>If the {@link PCollection} is empty, this transform returns a {@link PCollection} containing
    * a single empty iterable, even though in practice most runners will not produce any element.
    */
-  private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>>
+  private static class GroupGlobally<T>
+      extends PTransform<PCollection<T>, PCollection<Iterable<WindowedValue<T>>>>
       implements Serializable {
+    private final AssertionWindows rewindowingStrategy;
 
-    public GroupGlobally() {}
+    public GroupGlobally(AssertionWindows rewindowingStrategy) {
+      this.rewindowingStrategy = rewindowingStrategy;
+    }
 
     @Override
-    public PCollection<Iterable<T>> apply(PCollection<T> input) {
-
-      final int contentsKey = 0;
-      final int dummyKey = 1;
+    public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) {
       final int combinedKey = 42;
 
+      // Remove the triggering on both
+      PTransform<
+              PCollection<KV<Integer, Iterable<WindowedValue<T>>>>,
+              PCollection<KV<Integer, Iterable<WindowedValue<T>>>>>
+          removeTriggering =
+              Window.<KV<Integer, Iterable<WindowedValue<T>>>>triggering(Never.ever())
+                  .discardingFiredPanes()
+                  .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());
       // Group the contents by key. If it is empty, this PCollection will be empty, too.
       // Then key it again with a dummy key.
-      PCollection<KV<Integer, KV<Integer, Iterable<T>>>> doubleKeyedGroupedInput =
+      PCollection<KV<Integer, Iterable<WindowedValue<T>>>> groupedContents =
+          // TODO: Split the filtering from the rewindowing, and apply filtering before the Gather
+          // if the grouping of extra records
           input
-              .apply("GloballyWindow", Window.<T>into(new GlobalWindows()))
-              .apply("ContentsWithKeys", WithKeys.<Integer, T>of(contentsKey))
+              .apply(rewindowingStrategy.<T>prepareActuals())
+              .apply("GatherAllOutputs", GatherAllPanes.<T>globally())
               .apply(
-                  "NeverTriggerContents",
-                  Window.<KV<Integer, T>>triggering(Never.ever()).discardingFiredPanes())
-              .apply("ContentsGBK", GroupByKey.<Integer, T>create())
-              .apply(
-                  "DoubleKeyContents", WithKeys.<Integer, KV<Integer, Iterable<T>>>of(combinedKey));
+                  "RewindowActuals",
+                  rewindowingStrategy.<Iterable<WindowedValue<T>>>windowActuals())
+              .apply("KeyForDummy", WithKeys.<Integer, Iterable<WindowedValue<T>>>of(combinedKey))
+              .apply("RemoveActualsTriggering", removeTriggering);
 
       // Create another non-empty PCollection that is keyed with a distinct dummy key
-      PCollection<KV<Integer, KV<Integer, Iterable<T>>>> doubleKeyedDummy =
+      PCollection<KV<Integer, Iterable<WindowedValue<T>>>> keyedDummy =
           input
               .getPipeline()
               .apply(
                   Create.of(
                           KV.of(
                               combinedKey,
-                              KV.of(dummyKey, (Iterable<T>) Collections.<T>emptyList())))
-                      .withCoder(doubleKeyedGroupedInput.getCoder()))
-              .setWindowingStrategyInternal(doubleKeyedGroupedInput.getWindowingStrategy());
+                              (Iterable<WindowedValue<T>>)
+                                  Collections.<WindowedValue<T>>emptyList()))
+                      .withCoder(groupedContents.getCoder()))
+              .apply(
+                  "WindowIntoDummy",
+                  rewindowingStrategy.<KV<Integer, Iterable<WindowedValue<T>>>>windowDummy())
+              .apply("RemoveDummyTriggering", removeTriggering);
 
       // Flatten them together and group by the combined key to get a single element
-      PCollection<KV<Integer, Iterable<KV<Integer, Iterable<T>>>>> dummyAndContents =
-          PCollectionList.<KV<Integer, KV<Integer, Iterable<T>>>>of(doubleKeyedGroupedInput)
-              .and(doubleKeyedDummy)
+      PCollection<KV<Integer, Iterable<Iterable<WindowedValue<T>>>>> dummyAndContents =
+          PCollectionList.of(groupedContents)
+              .and(keyedDummy)
               .apply(
                   "FlattenDummyAndContents",
-                  Flatten.<KV<Integer, KV<Integer, Iterable<T>>>>pCollections())
+                  Flatten.<KV<Integer, Iterable<WindowedValue<T>>>>pCollections())
               .apply(
-                  "GroupDummyAndContents", GroupByKey.<Integer, KV<Integer, Iterable<T>>>create());
+                  "NeverTrigger",
+                  Window.<KV<Integer, Iterable<WindowedValue<T>>>>triggering(Never.ever())
+                      .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+                      .discardingFiredPanes())
+              .apply(
+                  "GroupDummyAndContents",
+                  GroupByKey.<Integer, Iterable<WindowedValue<T>>>create());
 
-      // Extract the contents if they exist else empty contents.
       return dummyAndContents
-          .apply(
-              "GetContents",
-              ParDo.of(
-                  new DoFn<KV<Integer, Iterable<KV<Integer, Iterable<T>>>>, Iterable<T>>() {
-                    @Override
-                    public void processElement(ProcessContext ctx) {
-                      Iterable<KV<Integer, Iterable<T>>> groupedDummyAndContents =
-                          ctx.element().getValue();
-
-                      if (Iterables.size(groupedDummyAndContents) == 1) {
-                        // Only the dummy value, so just output empty
-                        ctx.output(Collections.<T>emptyList());
-                      } else {
-                        checkState(
-                            Iterables.size(groupedDummyAndContents) == 2,
-                            "Internal error: PAssert grouped contents with a"
-                                + " dummy value resulted in more than 2 groupings: %s",
-                                groupedDummyAndContents);
-
-                        if (Iterables.get(groupedDummyAndContents, 0).getKey() == contentsKey) {
-                          // The first iterable in the group holds the real contents
-                          ctx.output(Iterables.get(groupedDummyAndContents, 0).getValue());
-                        } else {
-                          // The second iterable holds the real contents
-                          ctx.output(Iterables.get(groupedDummyAndContents, 1).getValue());
-                        }
-                      }
-                    }
-                  }));
+          .apply(Values.<Iterable<Iterable<WindowedValue<T>>>>create())
+          .apply(ParDo.of(new ConcatFn<WindowedValue<T>>()));
+    }
+  }
+
+  private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(Iterables.concat(c.element()));
     }
   }
 
@@ -653,15 +740,20 @@ public class PAssert {
   public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone>
       implements Serializable {
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
+    private final AssertionWindows rewindowingStrategy;
 
-    private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) {
+    private GroupThenAssert(
+        SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy) {
       this.checkerFn = checkerFn;
+      this.rewindowingStrategy = rewindowingStrategy;
     }
 
     @Override
     public PDone apply(PCollection<T> input) {
       input
-          .apply("GroupGlobally", new GroupGlobally<T>())
+          .apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
+          .apply("GetOnlyPane", ParDo.of(new ExtractOnlyPane<T>()))
+          .setCoder(IterableCoder.of(input.getCoder()))
           .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
 
       return PDone.in(input.getPipeline());
@@ -675,15 +767,20 @@ public class PAssert {
   public static class GroupThenAssertForSingleton<T>
       extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable {
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
+    private final AssertionWindows rewindowingStrategy;
 
-    private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) {
+    private GroupThenAssertForSingleton(
+        SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy) {
       this.checkerFn = checkerFn;
+      this.rewindowingStrategy = rewindowingStrategy;
     }
 
     @Override
     public PDone apply(PCollection<Iterable<T>> input) {
       input
-          .apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
+          .apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy))
+          .apply("GetOnlyPane", ParDo.of(new ExtractOnlyPane<Iterable<T>>()))
+          .setCoder(IterableCoder.of(input.getCoder()))
           .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
 
       return PDone.in(input.getPipeline());
@@ -703,12 +800,15 @@ public class PAssert {
   public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone>
       implements Serializable {
     private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
+    private final transient PTransform<PCollection<Integer>, PCollection<Integer>> windowToken;
     private final SerializableFunction<ActualT, Void> checkerFn;
 
     private OneSideInputAssert(
         PTransform<PBegin, PCollectionView<ActualT>> createActual,
+        PTransform<PCollection<Integer>, PCollection<Integer>> windowToken,
         SerializableFunction<ActualT, Void> checkerFn) {
       this.createActual = createActual;
+      this.windowToken = windowToken;
       this.checkerFn = checkerFn;
     }
 
@@ -718,7 +818,9 @@ public class PAssert {
 
       input
           .apply(Create.of(0).withCoder(VarIntCoder.of()))
-          .apply("RunChecks",
+          .apply("WindowToken", windowToken)
+          .apply(
+              "RunChecks",
               ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual)));
 
       return PDone.in(input.getPipeline());
@@ -760,6 +862,23 @@ public class PAssert {
     }
   }
 
+  private static class ExtractOnlyPane<T> extends DoFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      List<T> outputs = new ArrayList<>();
+      for (WindowedValue<T> value : c.element()) {
+        checkState(
+            value.getPane().isFirst() && value.getPane().isLast(),
+            "Expected elements to be produced by a trigger that fires at most once, but got"
+                + "a value in a pane that is %s. Actual Pane Info: %s",
+            value.getPane().isFirst() ? "not the last pane" : "not the first pane",
+            value.getPane());
+        outputs.add(value.getValue());
+      }
+      c.output(outputs);
+    }
+  }
+
   /**
    * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
    * the single iterable element of the input {@link PCollection} and adjusts counters and
@@ -948,4 +1067,120 @@ public class PAssert {
       return new AssertContainsInAnyOrder<T>(expectedElements);
     }
   }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * A strategy for filtering and rewindowing the actual and dummy {@link PCollection PCollections}
+   * within a {@link PAssert}.
+   *
+   * <p>This must ensure that the windowing strategies of the output of {@link #windowActuals()} and
+   * {@link #windowDummy()} are compatible (and can be {@link Flatten Flattened}).
+   *
+   * <p>The {@link PCollection} produced by {@link #prepareActuals()} will be a parent (though not
+   * a direct parent) of the transform provided to {@link #windowActuals()}.
+   */
+  private interface AssertionWindows {
+    /**
+     * Returns a transform that assigns the dummy element into the appropriate
+     * {@link BoundedWindow windows}.
+     */
+    <T> PTransform<PCollection<T>, PCollection<T>> windowDummy();
+
+    /**
+     * Returns a transform that filters and reassigns windows of the actual elements if necessary.
+     */
+    <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals();
+
+    /**
+     * Returns a transform that assigns the actual elements into the appropriate
+     * {@link BoundedWindow windows}. Will be called after {@link #prepareActuals()}.
+     */
+    <T> PTransform<PCollection<T>, PCollection<T>> windowActuals();
+  }
+
+  /**
+   * An {@link AssertionWindows} which assigns all elements to the {@link GlobalWindow}.
+   */
+  private static class IntoGlobalWindow implements AssertionWindows, Serializable {
+    public static AssertionWindows of() {
+      return new IntoGlobalWindow();
+    }
+
+    private <T> PTransform<PCollection<T>, PCollection<T>> window() {
+      return Window.into(new GlobalWindows());
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> windowDummy() {
+      return window();
+    }
+
+    /**
+     * Rewindows all input elements into the {@link GlobalWindow}. This ensures that the result
+     * PCollection will contain all of the elements of the PCollection when the window is not
+     * specified.
+     */
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals() {
+      return window();
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> windowActuals() {
+      return window();
+    }
+  }
+
+  private static class IntoStaticWindows implements AssertionWindows {
+    private final StaticWindows windowFn;
+
+    public static AssertionWindows of(Coder<BoundedWindow> windowCoder, BoundedWindow window) {
+      return new IntoStaticWindows(StaticWindows.of(windowCoder, window));
+    }
+
+    private IntoStaticWindows(StaticWindows windowFn) {
+      this.windowFn = windowFn;
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> windowDummy() {
+      return Window.into(windowFn);
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals() {
+      return new FilterWindows<>(windowFn);
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> windowActuals() {
+      return Window.into(windowFn.intoOnlyExisting());
+    }
+  }
+
+  /**
+   * A DoFn that filters elements based on their presence in a static collection of windows.
+   */
+  private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private final StaticWindows windows;
+
+    public FilterWindows(StaticWindows windows) {
+      this.windows = windows;
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      return input.apply("FilterWindows", ParDo.of(new Fn()));
+    }
+
+    private class Fn extends DoFn<T, T> implements RequiresWindowAccess {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        if (windows.getWindows().contains(c.window())) {
+          c.output(c.element());
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f449881/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index fdc8719..bafd897 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -25,11 +27,21 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.collect.Iterables;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -148,6 +160,45 @@ public class PAssertTest implements Serializable {
   }
 
   /**
+   * A {@link PAssert} about the contents of a {@link PCollection}
+   * is allows to be verified by an arbitrary {@link SerializableFunction},
+   * though.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testWindowedSerializablePredicate() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<NotSerializableObject> pcollection = pipeline
+        .apply(Create.timestamped(
+            TimestampedValue.of(new NotSerializableObject(), new Instant(250L)),
+            TimestampedValue.of(new NotSerializableObject(), new Instant(500L)))
+            .withCoder(NotSerializableObjectCoder.of()))
+        .apply(Window.<NotSerializableObject>into(FixedWindows.of(Duration.millis(300L))));
+
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(0L), new Instant(300L)))
+        .satisfies(new SerializableFunction<Iterable<NotSerializableObject>, Void>() {
+          @Override
+          public Void apply(Iterable<NotSerializableObject> contents) {
+            assertThat(Iterables.isEmpty(contents), is(false));
+            return null; // no problem!
+          }
+        });
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(300L), new Instant(600L)))
+        .satisfies(new SerializableFunction<Iterable<NotSerializableObject>, Void>() {
+          @Override
+          public Void apply(Iterable<NotSerializableObject> contents) {
+            assertThat(Iterables.isEmpty(contents), is(false));
+            return null; // no problem!
+          }
+        });
+
+    pipeline.run();
+  }
+
+  /**
    * Test that we throw an error at pipeline construction time when the user mistakenly uses
    * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}.
    */
@@ -220,6 +271,26 @@ public class PAssertTest implements Serializable {
   }
 
   /**
+   * Basic test for {@code isEqualTo}.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testWindowedIsEqualTo() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> pcollection =
+        pipeline.apply(Create.timestamped(TimestampedValue.of(43, new Instant(250L)),
+            TimestampedValue.of(22, new Instant(-250L))))
+            .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(500L))));
+    PAssert.thatSingleton(pcollection)
+        .inWindow(new IntervalWindow(new Instant(0L), new Instant(500L)))
+        .isEqualTo(43);
+    PAssert.thatSingleton(pcollection)
+        .inWindow(new IntervalWindow(new Instant(-500L), new Instant(0L)))
+        .isEqualTo(22);
+    pipeline.run();
+  }
+
+  /**
    * Basic test for {@code notEqualTo}.
    */
   @Test
@@ -244,6 +315,51 @@ public class PAssertTest implements Serializable {
   }
 
   /**
+   * Tests that {@code containsInAnyOrder} is actually order-independent.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testGlobalWindowContainsInAnyOrder() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
+    PAssert.that(pcollection).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(2, 1, 4, 3);
+    pipeline.run();
+  }
+
+  /**
+   * Tests that windowed {@code containsInAnyOrder} is actually order-independent.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testWindowedContainsInAnyOrder() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> pcollection =
+        pipeline.apply(Create.timestamped(TimestampedValue.of(1, new Instant(100L)),
+            TimestampedValue.of(2, new Instant(200L)),
+            TimestampedValue.of(3, new Instant(300L)),
+            TimestampedValue.of(4, new Instant(400L))))
+            .apply(Window.<Integer>into(SlidingWindows.of(Duration.millis(200L))
+                .every(Duration.millis(100L))
+                .withOffset(Duration.millis(50L))));
+
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(-50L), new Instant(150L))).containsInAnyOrder(1);
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(50L), new Instant(250L)))
+        .containsInAnyOrder(2, 1);
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(150L), new Instant(350L)))
+        .containsInAnyOrder(2, 3);
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(250L), new Instant(450L)))
+        .containsInAnyOrder(4, 3);
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(350L), new Instant(550L)))
+        .containsInAnyOrder(4);
+    pipeline.run();
+  }
+
+  /**
    * Tests that {@code containsInAnyOrder} fails when and how it should.
    */
   @Test