You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/24 21:24:08 UTC

[1/5] incubator-beam git commit: Add StaticWindows

Repository: incubator-beam
Updated Branches:
  refs/heads/master bd21ead63 -> ff72e27fc


Add StaticWindows

This is a windowFn that ignores the input and always assigns to the same
input. It returns the provided window as the side input window if and
only if that window is present within its set of windows.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a2ab828a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a2ab828a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a2ab828a

Branch: refs/heads/master
Commit: a2ab828ac3d69965d7a08d862751ae068f3189cc
Parents: 41faee4
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 21 10:44:43 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 10:34:24 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/testing/StaticWindows.java  | 110 +++++++++++++++++++
 .../apache/beam/sdk/testing/WindowSupplier.java |  83 ++++++++++++++
 .../beam/sdk/testing/StaticWindowsTest.java     |  94 ++++++++++++++++
 .../beam/sdk/testing/WindowSupplierTest.java    |  89 +++++++++++++++
 4 files changed, 376 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2ab828a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
new file mode 100644
index 0000000..08d2355
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * A {@link WindowFn} that assigns all elements to a static collection of
+ * {@link BoundedWindow BoundedWindows}. Side inputs windowed into static windows only support
+ * main input windows in the provided collection of windows.
+ */
+final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
+  private final Supplier<Collection<BoundedWindow>> windows;
+  private final Coder<BoundedWindow> coder;
+
+  private final boolean onlyExisting;
+
+  private StaticWindows(
+      Supplier<Collection<BoundedWindow>> windows,
+      Coder<BoundedWindow> coder,
+      boolean onlyExisting) {
+    this.windows = windows;
+    this.coder = coder;
+    this.onlyExisting = onlyExisting;
+  }
+
+  public static <W extends BoundedWindow> StaticWindows of(Coder<W> coder, Iterable<W> windows) {
+    checkArgument(!Iterables.isEmpty(windows), "Input windows to StaticWindows may not be empty");
+    @SuppressWarnings("unchecked")
+    StaticWindows windowFn =
+        new StaticWindows(
+            WindowSupplier.of((Coder<BoundedWindow>) coder, (Iterable<BoundedWindow>) windows),
+            (Coder<BoundedWindow>) coder,
+            false);
+    return windowFn;
+  }
+
+  public static <W extends BoundedWindow> StaticWindows of(Coder<W> coder, W window) {
+    return of(coder, Collections.singleton(window));
+  }
+
+  public StaticWindows intoOnlyExisting() {
+    return new StaticWindows(windows, coder, true);
+  }
+
+  public Collection<BoundedWindow> getWindows() {
+    return windows.get();
+  }
+
+  @Override
+  public Collection<BoundedWindow> assignWindows(AssignContext c) throws Exception {
+    if (onlyExisting) {
+      checkArgument(
+          windows.get().contains(c.window()),
+          "Tried to assign windows to an element that is not already windowed into a provided "
+              + "window when onlyExisting is set to true");
+      return Collections.singleton(c.window());
+    } else {
+      return getWindows();
+    }
+  }
+
+  @Override
+  public boolean isCompatible(WindowFn<?, ?> other) {
+    if (!(other instanceof StaticWindows)) {
+      return false;
+    }
+    StaticWindows that = (StaticWindows) other;
+    return Objects.equals(this.windows.get(), that.windows.get());
+  }
+
+  @Override
+  public Coder<BoundedWindow> windowCoder() {
+    return coder;
+  }
+
+  @Override
+  public BoundedWindow getSideInputWindow(BoundedWindow window) {
+    checkArgument(windows.get().contains(window),
+        "StaticWindows only supports side input windows for main input windows that it contains");
+    return window;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2ab828a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
new file mode 100644
index 0000000..62bc09f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * A {@link Supplier} that returns a static set of {@link BoundedWindow BoundedWindows}. The
+ * supplier is {@link Serializable}, and handles encoding and decoding the windows with a
+ * {@link Coder} provided for the windows.
+ */
+final class WindowSupplier implements Supplier<Collection<BoundedWindow>>, Serializable {
+  private final Coder<? extends BoundedWindow> coder;
+  private final Collection<byte[]> encodedWindows;
+
+  private transient Collection<BoundedWindow> windows;
+
+  public static <W extends BoundedWindow> WindowSupplier of(Coder<W> coder, Iterable<W> windows) {
+    ImmutableSet.Builder<byte[]> windowsBuilder = ImmutableSet.builder();
+    for (W window : windows) {
+      try {
+        windowsBuilder.add(CoderUtils.encodeToByteArray(coder, window));
+      } catch (CoderException e) {
+        throw new IllegalArgumentException(
+            "Could not encode provided windows with the provided window coder", e);
+      }
+    }
+    return new WindowSupplier(coder, windowsBuilder.build());
+  }
+
+  private WindowSupplier(Coder<? extends BoundedWindow> coder, Collection<byte[]> encodedWindows) {
+    this.coder = coder;
+    this.encodedWindows = encodedWindows;
+  }
+
+  @Override
+  public Collection<BoundedWindow> get() {
+    if (windows == null) {
+      decodeWindows();
+    }
+    return windows;
+  }
+
+  private synchronized void decodeWindows() {
+    if (windows == null) {
+      ImmutableList.Builder<BoundedWindow> windowsBuilder = ImmutableList.builder();
+      for (byte[] encoded : encodedWindows) {
+        try {
+          windowsBuilder.add(CoderUtils.decodeFromByteArray(coder, encoded));
+        } catch (CoderException e) {
+          throw new IllegalArgumentException(
+              "Could not decode provided windows with the provided window coder", e);
+        }
+      }
+      this.windows = windowsBuilder.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2ab828a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
new file mode 100644
index 0000000..fd715dc
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link StaticWindows}.
+ */
+@RunWith(JUnit4.class)
+public class StaticWindowsTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private final IntervalWindow first = new IntervalWindow(new Instant(0), new Instant(100_000L));
+  private final IntervalWindow second =
+      new IntervalWindow(new Instant(1_000_000L), GlobalWindow.INSTANCE.maxTimestamp());
+
+  @Test
+  public void singleWindowSucceeds() throws Exception {
+    WindowFn<Object, BoundedWindow> fn = StaticWindows.of(IntervalWindow.getCoder(), first);
+    assertThat(WindowFnTestUtils.assignedWindows(fn, 100L),
+        Matchers.<BoundedWindow>contains(first));
+    assertThat(WindowFnTestUtils.assignedWindows(fn, -100L),
+        Matchers.<BoundedWindow>contains(first));
+  }
+
+  @Test
+  public void multipleWindowsSucceeds() throws Exception {
+    WindowFn<Object, BoundedWindow> fn =
+        StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second));
+    assertThat(WindowFnTestUtils.assignedWindows(fn, 100L),
+        Matchers.<BoundedWindow>containsInAnyOrder(first, second));
+    assertThat(WindowFnTestUtils.assignedWindows(fn, 1_000_000_000L),
+        Matchers.<BoundedWindow>containsInAnyOrder(first, second));
+    assertThat(WindowFnTestUtils.assignedWindows(fn, -100L),
+        Matchers.<BoundedWindow>containsInAnyOrder(first, second));
+  }
+
+  @Test
+  public void getSideInputWindowIdentity() {
+    WindowFn<Object, BoundedWindow> fn =
+        StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second));
+
+    assertThat(fn.getSideInputWindow(first), Matchers.<BoundedWindow>equalTo(first));
+    assertThat(fn.getSideInputWindow(second), Matchers.<BoundedWindow>equalTo(second));
+  }
+
+  @Test
+  public void getSideInputWindowNotPresent() {
+    WindowFn<Object, BoundedWindow> fn =
+        StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(second));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("contains");
+    fn.getSideInputWindow(first);
+  }
+
+  @Test
+  public void emptyIterableThrows() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("may not be empty");
+    StaticWindows.of(GlobalWindow.Coder.INSTANCE, ImmutableList.<GlobalWindow>of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2ab828a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
new file mode 100644
index 0000000..178c67c
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.SerializableUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+
+/**
+ * Tests for {@link WindowSupplier}.
+ */
+public class WindowSupplierTest {
+  private final IntervalWindow window = new IntervalWindow(new Instant(0L), new Instant(100L));
+  private final IntervalWindow otherWindow =
+      new IntervalWindow(new Instant(-100L), new Instant(100L));
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void getReturnsProvidedWindows() {
+    assertThat(
+        WindowSupplier.of(IntervalWindow.getCoder(), ImmutableList.of(window, otherWindow)).get(),
+        Matchers.<BoundedWindow>containsInAnyOrder(otherWindow, window));
+  }
+
+  @Test
+  public void getAfterSerialization() {
+    WindowSupplier supplier =
+        WindowSupplier.of(IntervalWindow.getCoder(), ImmutableList.of(window, otherWindow));
+    assertThat(
+        SerializableUtils.clone(supplier).get(),
+        Matchers.<BoundedWindow>containsInAnyOrder(otherWindow, window));
+  }
+
+  @Test
+  public void unencodableWindowFails() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Could not encode");
+    WindowSupplier.of(
+        new FailingCoder(),
+        Collections.<BoundedWindow>singleton(window));
+  }
+
+  private static class FailingCoder extends AtomicCoder<BoundedWindow>  {
+    @Override
+    public void encode(
+        BoundedWindow value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+      throw new CoderException("Test Enccode Exception");
+    }
+
+    @Override
+    public BoundedWindow decode(
+        InputStream inStream, Context context) throws CoderException, IOException {
+      throw new CoderException("Test Decode Exception");
+    }
+  }
+}


[4/5] incubator-beam git commit: Use GatherAllPanes in PAssert

Posted by ke...@apache.org.
Use GatherAllPanes in PAssert

Instead of explicitly grouping by key, gather all the panes across the
input window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f449881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f449881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f449881

Branch: refs/heads/master
Commit: 1f449881a88d9927b507ad46d0004e3c9805513a
Parents: ec1bb3a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 20 14:38:11 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 13:33:13 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 369 +++++++++++++++----
 .../apache/beam/sdk/testing/PAssertTest.java    | 116 ++++++
 2 files changed, 418 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f449881/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 1a3d85d..883b2b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.testing;
 
 import static com.google.common.base.Preconditions.checkState;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
@@ -36,18 +35,24 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.GatherAllPanes;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -63,9 +68,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
@@ -116,6 +123,14 @@ public class PAssert {
    * Builder interface for assertions applicable to iterables and PCollection contents.
    */
   public interface IterableAssert<T> {
+    /**
+     * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only
+     * run on the provided window.
+     *
+     * @return a new {@link IterableAssert} like this one but with the assertion only applied to the
+     * specified window.
+     */
+    IterableAssert<T> inWindow(BoundedWindow window);
 
     /**
      * Asserts that the iterable in question contains the provided elements.
@@ -152,6 +167,15 @@ public class PAssert {
    */
   public interface SingletonAssert<T> {
     /**
+     * Creates a new {@link SingletonAssert} like this one, but with the assertion restricted to
+     * only run on the provided window.
+     *
+     * @return a new {@link SingletonAssert} like this one but with the assertion only applied to
+     * the specified window.
+     */
+    SingletonAssert<T> inWindow(BoundedWindow window);
+
+    /**
      * Asserts that the value in question is equal to the provided value, according to
      * {@link Object#equals}.
      *
@@ -250,9 +274,23 @@ public class PAssert {
    */
   private static class PCollectionContentsAssert<T> implements IterableAssert<T> {
     private final PCollection<T> actual;
+    private final AssertionWindows rewindowingStrategy;
 
     public PCollectionContentsAssert(PCollection<T> actual) {
+      this(actual, IntoGlobalWindow.<T>of());
+    }
+
+    public PCollectionContentsAssert(PCollection<T> actual, AssertionWindows rewindowingStrategy) {
       this.actual = actual;
+      this.rewindowingStrategy = rewindowingStrategy;
+    }
+
+    @Override
+    public PCollectionContentsAssert<T> inWindow(BoundedWindow window) {
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder<BoundedWindow> windowCoder =
+          (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
+      return new PCollectionContentsAssert<>(actual, IntoStaticWindows.<T>of(windowCoder, window));
     }
 
     /**
@@ -285,7 +323,7 @@ public class PAssert {
     @Override
     public PCollectionContentsAssert<T> satisfies(
         SerializableFunction<Iterable<T>, Void> checkerFn) {
-      actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn));
+      actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn, rewindowingStrategy));
       return this;
     }
 
@@ -325,7 +363,8 @@ public class PAssert {
       @SuppressWarnings({"rawtypes", "unchecked"})
       SerializableFunction<Iterable<T>, Void> checkerFn =
           (SerializableFunction) new MatcherCheckerFn<>(matcher);
-      actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
+      actual.apply(
+          "PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn, rewindowingStrategy));
       return this;
     }
 
@@ -374,13 +413,30 @@ public class PAssert {
   private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> {
     private final PCollection<Iterable<T>> actual;
     private final Coder<T> elementCoder;
+    private final AssertionWindows rewindowingStrategy;
 
     public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) {
+      this(actual, IntoGlobalWindow.<Iterable<T>>of());
+    }
+
+    public PCollectionSingletonIterableAssert(
+        PCollection<Iterable<T>> actual, AssertionWindows rewindowingStrategy) {
       this.actual = actual;
 
       @SuppressWarnings("unchecked")
       Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0);
       this.elementCoder = typedCoder;
+
+      this.rewindowingStrategy = rewindowingStrategy;
+    }
+
+    @Override
+    public PCollectionSingletonIterableAssert<T> inWindow(BoundedWindow window) {
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder<BoundedWindow> windowCoder =
+          (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
+      return new PCollectionSingletonIterableAssert<>(
+          actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window));
     }
 
     @Override
@@ -402,7 +458,9 @@ public class PAssert {
     @Override
     public PCollectionSingletonIterableAssert<T> satisfies(
         SerializableFunction<Iterable<T>, Void> checkerFn) {
-      actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn));
+      actual.apply(
+          "PAssert$" + (assertCount++),
+          new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy));
       return this;
     }
 
@@ -421,18 +479,38 @@ public class PAssert {
   private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> {
     private final PCollection<ElemT> actual;
     private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
+    private final AssertionWindows rewindowActuals;
     private final Coder<ViewT> coder;
 
     protected PCollectionViewAssert(
         PCollection<ElemT> actual,
         PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
         Coder<ViewT> coder) {
+      this(actual, view, IntoGlobalWindow.<ElemT>of(), coder);
+    }
+
+    private PCollectionViewAssert(
+        PCollection<ElemT> actual,
+        PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
+        AssertionWindows rewindowActuals,
+        Coder<ViewT> coder) {
       this.actual = actual;
       this.view = view;
+      this.rewindowActuals = rewindowActuals;
       this.coder = coder;
     }
 
     @Override
+    public PCollectionViewAssert<ElemT, ViewT> inWindow(BoundedWindow window) {
+      return new PCollectionViewAssert<>(
+          actual,
+          view,
+          IntoStaticWindows.of(
+              (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(), window),
+          coder);
+    }
+
+    @Override
     public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
       return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
     }
@@ -449,7 +527,10 @@ public class PAssert {
           .getPipeline()
           .apply(
               "PAssert$" + (assertCount++),
-              new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn));
+              new OneSideInputAssert<ViewT>(
+                  CreateActual.from(actual, rewindowActuals, view),
+                  rewindowActuals.<Integer>windowDummy(),
+                  checkerFn));
       return this;
     }
 
@@ -496,16 +577,22 @@ public class PAssert {
       extends PTransform<PBegin, PCollectionView<ActualT>> {
 
     private final transient PCollection<T> actual;
+    private final transient AssertionWindows rewindowActuals;
     private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
 
     public static <T, ActualT> CreateActual<T, ActualT> from(
-        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
-      return new CreateActual<>(actual, actualView);
+        PCollection<T> actual,
+        AssertionWindows rewindowActuals,
+        PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+      return new CreateActual<>(actual, rewindowActuals, actualView);
     }
 
     private CreateActual(
-        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+        PCollection<T> actual,
+        AssertionWindows rewindowActuals,
+        PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
       this.actual = actual;
+      this.rewindowActuals = rewindowActuals;
       this.actualView = actualView;
     }
 
@@ -513,7 +600,8 @@ public class PAssert {
     public PCollectionView<ActualT> apply(PBegin input) {
       final Coder<T> coder = actual.getCoder();
       return actual
-          .apply(Window.<T>into(new GlobalWindows()))
+          .apply("FilterActuals", rewindowActuals.<T>prepareActuals())
+          .apply("RewindowActuals", rewindowActuals.<T>windowActuals())
           .apply(
               ParDo.of(
                   new DoFn<T, T>() {
@@ -565,84 +653,83 @@ public class PAssert {
    * <p>If the {@link PCollection} is empty, this transform returns a {@link PCollection} containing
    * a single empty iterable, even though in practice most runners will not produce any element.
    */
-  private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>>
+  private static class GroupGlobally<T>
+      extends PTransform<PCollection<T>, PCollection<Iterable<WindowedValue<T>>>>
       implements Serializable {
+    private final AssertionWindows rewindowingStrategy;
 
-    public GroupGlobally() {}
+    public GroupGlobally(AssertionWindows rewindowingStrategy) {
+      this.rewindowingStrategy = rewindowingStrategy;
+    }
 
     @Override
-    public PCollection<Iterable<T>> apply(PCollection<T> input) {
-
-      final int contentsKey = 0;
-      final int dummyKey = 1;
+    public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) {
       final int combinedKey = 42;
 
+      // Remove the triggering on both
+      PTransform<
+              PCollection<KV<Integer, Iterable<WindowedValue<T>>>>,
+              PCollection<KV<Integer, Iterable<WindowedValue<T>>>>>
+          removeTriggering =
+              Window.<KV<Integer, Iterable<WindowedValue<T>>>>triggering(Never.ever())
+                  .discardingFiredPanes()
+                  .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());
       // Group the contents by key. If it is empty, this PCollection will be empty, too.
       // Then key it again with a dummy key.
-      PCollection<KV<Integer, KV<Integer, Iterable<T>>>> doubleKeyedGroupedInput =
+      PCollection<KV<Integer, Iterable<WindowedValue<T>>>> groupedContents =
+          // TODO: Split the filtering from the rewindowing, and apply filtering before the Gather
+          // if the grouping of extra records
           input
-              .apply("GloballyWindow", Window.<T>into(new GlobalWindows()))
-              .apply("ContentsWithKeys", WithKeys.<Integer, T>of(contentsKey))
+              .apply(rewindowingStrategy.<T>prepareActuals())
+              .apply("GatherAllOutputs", GatherAllPanes.<T>globally())
               .apply(
-                  "NeverTriggerContents",
-                  Window.<KV<Integer, T>>triggering(Never.ever()).discardingFiredPanes())
-              .apply("ContentsGBK", GroupByKey.<Integer, T>create())
-              .apply(
-                  "DoubleKeyContents", WithKeys.<Integer, KV<Integer, Iterable<T>>>of(combinedKey));
+                  "RewindowActuals",
+                  rewindowingStrategy.<Iterable<WindowedValue<T>>>windowActuals())
+              .apply("KeyForDummy", WithKeys.<Integer, Iterable<WindowedValue<T>>>of(combinedKey))
+              .apply("RemoveActualsTriggering", removeTriggering);
 
       // Create another non-empty PCollection that is keyed with a distinct dummy key
-      PCollection<KV<Integer, KV<Integer, Iterable<T>>>> doubleKeyedDummy =
+      PCollection<KV<Integer, Iterable<WindowedValue<T>>>> keyedDummy =
           input
               .getPipeline()
               .apply(
                   Create.of(
                           KV.of(
                               combinedKey,
-                              KV.of(dummyKey, (Iterable<T>) Collections.<T>emptyList())))
-                      .withCoder(doubleKeyedGroupedInput.getCoder()))
-              .setWindowingStrategyInternal(doubleKeyedGroupedInput.getWindowingStrategy());
+                              (Iterable<WindowedValue<T>>)
+                                  Collections.<WindowedValue<T>>emptyList()))
+                      .withCoder(groupedContents.getCoder()))
+              .apply(
+                  "WindowIntoDummy",
+                  rewindowingStrategy.<KV<Integer, Iterable<WindowedValue<T>>>>windowDummy())
+              .apply("RemoveDummyTriggering", removeTriggering);
 
       // Flatten them together and group by the combined key to get a single element
-      PCollection<KV<Integer, Iterable<KV<Integer, Iterable<T>>>>> dummyAndContents =
-          PCollectionList.<KV<Integer, KV<Integer, Iterable<T>>>>of(doubleKeyedGroupedInput)
-              .and(doubleKeyedDummy)
+      PCollection<KV<Integer, Iterable<Iterable<WindowedValue<T>>>>> dummyAndContents =
+          PCollectionList.of(groupedContents)
+              .and(keyedDummy)
               .apply(
                   "FlattenDummyAndContents",
-                  Flatten.<KV<Integer, KV<Integer, Iterable<T>>>>pCollections())
+                  Flatten.<KV<Integer, Iterable<WindowedValue<T>>>>pCollections())
               .apply(
-                  "GroupDummyAndContents", GroupByKey.<Integer, KV<Integer, Iterable<T>>>create());
+                  "NeverTrigger",
+                  Window.<KV<Integer, Iterable<WindowedValue<T>>>>triggering(Never.ever())
+                      .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+                      .discardingFiredPanes())
+              .apply(
+                  "GroupDummyAndContents",
+                  GroupByKey.<Integer, Iterable<WindowedValue<T>>>create());
 
-      // Extract the contents if they exist else empty contents.
       return dummyAndContents
-          .apply(
-              "GetContents",
-              ParDo.of(
-                  new DoFn<KV<Integer, Iterable<KV<Integer, Iterable<T>>>>, Iterable<T>>() {
-                    @Override
-                    public void processElement(ProcessContext ctx) {
-                      Iterable<KV<Integer, Iterable<T>>> groupedDummyAndContents =
-                          ctx.element().getValue();
-
-                      if (Iterables.size(groupedDummyAndContents) == 1) {
-                        // Only the dummy value, so just output empty
-                        ctx.output(Collections.<T>emptyList());
-                      } else {
-                        checkState(
-                            Iterables.size(groupedDummyAndContents) == 2,
-                            "Internal error: PAssert grouped contents with a"
-                                + " dummy value resulted in more than 2 groupings: %s",
-                                groupedDummyAndContents);
-
-                        if (Iterables.get(groupedDummyAndContents, 0).getKey() == contentsKey) {
-                          // The first iterable in the group holds the real contents
-                          ctx.output(Iterables.get(groupedDummyAndContents, 0).getValue());
-                        } else {
-                          // The second iterable holds the real contents
-                          ctx.output(Iterables.get(groupedDummyAndContents, 1).getValue());
-                        }
-                      }
-                    }
-                  }));
+          .apply(Values.<Iterable<Iterable<WindowedValue<T>>>>create())
+          .apply(ParDo.of(new ConcatFn<WindowedValue<T>>()));
+    }
+  }
+
+  private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(Iterables.concat(c.element()));
     }
   }
 
@@ -653,15 +740,20 @@ public class PAssert {
   public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone>
       implements Serializable {
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
+    private final AssertionWindows rewindowingStrategy;
 
-    private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) {
+    private GroupThenAssert(
+        SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy) {
       this.checkerFn = checkerFn;
+      this.rewindowingStrategy = rewindowingStrategy;
     }
 
     @Override
     public PDone apply(PCollection<T> input) {
       input
-          .apply("GroupGlobally", new GroupGlobally<T>())
+          .apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
+          .apply("GetOnlyPane", ParDo.of(new ExtractOnlyPane<T>()))
+          .setCoder(IterableCoder.of(input.getCoder()))
           .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
 
       return PDone.in(input.getPipeline());
@@ -675,15 +767,20 @@ public class PAssert {
   public static class GroupThenAssertForSingleton<T>
       extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable {
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
+    private final AssertionWindows rewindowingStrategy;
 
-    private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) {
+    private GroupThenAssertForSingleton(
+        SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy) {
       this.checkerFn = checkerFn;
+      this.rewindowingStrategy = rewindowingStrategy;
     }
 
     @Override
     public PDone apply(PCollection<Iterable<T>> input) {
       input
-          .apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
+          .apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy))
+          .apply("GetOnlyPane", ParDo.of(new ExtractOnlyPane<Iterable<T>>()))
+          .setCoder(IterableCoder.of(input.getCoder()))
           .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
 
       return PDone.in(input.getPipeline());
@@ -703,12 +800,15 @@ public class PAssert {
   public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone>
       implements Serializable {
     private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
+    private final transient PTransform<PCollection<Integer>, PCollection<Integer>> windowToken;
     private final SerializableFunction<ActualT, Void> checkerFn;
 
     private OneSideInputAssert(
         PTransform<PBegin, PCollectionView<ActualT>> createActual,
+        PTransform<PCollection<Integer>, PCollection<Integer>> windowToken,
         SerializableFunction<ActualT, Void> checkerFn) {
       this.createActual = createActual;
+      this.windowToken = windowToken;
       this.checkerFn = checkerFn;
     }
 
@@ -718,7 +818,9 @@ public class PAssert {
 
       input
           .apply(Create.of(0).withCoder(VarIntCoder.of()))
-          .apply("RunChecks",
+          .apply("WindowToken", windowToken)
+          .apply(
+              "RunChecks",
               ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual)));
 
       return PDone.in(input.getPipeline());
@@ -760,6 +862,23 @@ public class PAssert {
     }
   }
 
+  private static class ExtractOnlyPane<T> extends DoFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      List<T> outputs = new ArrayList<>();
+      for (WindowedValue<T> value : c.element()) {
+        checkState(
+            value.getPane().isFirst() && value.getPane().isLast(),
+            "Expected elements to be produced by a trigger that fires at most once, but got"
+                + "a value in a pane that is %s. Actual Pane Info: %s",
+            value.getPane().isFirst() ? "not the last pane" : "not the first pane",
+            value.getPane());
+        outputs.add(value.getValue());
+      }
+      c.output(outputs);
+    }
+  }
+
   /**
    * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
    * the single iterable element of the input {@link PCollection} and adjusts counters and
@@ -948,4 +1067,120 @@ public class PAssert {
       return new AssertContainsInAnyOrder<T>(expectedElements);
     }
   }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * A strategy for filtering and rewindowing the actual and dummy {@link PCollection PCollections}
+   * within a {@link PAssert}.
+   *
+   * <p>This must ensure that the windowing strategies of the output of {@link #windowActuals()} and
+   * {@link #windowDummy()} are compatible (and can be {@link Flatten Flattened}).
+   *
+   * <p>The {@link PCollection} produced by {@link #prepareActuals()} will be a parent (though not
+   * a direct parent) of the transform provided to {@link #windowActuals()}.
+   */
+  private interface AssertionWindows {
+    /**
+     * Returns a transform that assigns the dummy element into the appropriate
+     * {@link BoundedWindow windows}.
+     */
+    <T> PTransform<PCollection<T>, PCollection<T>> windowDummy();
+
+    /**
+     * Returns a transform that filters and reassigns windows of the actual elements if necessary.
+     */
+    <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals();
+
+    /**
+     * Returns a transform that assigns the actual elements into the appropriate
+     * {@link BoundedWindow windows}. Will be called after {@link #prepareActuals()}.
+     */
+    <T> PTransform<PCollection<T>, PCollection<T>> windowActuals();
+  }
+
+  /**
+   * An {@link AssertionWindows} which assigns all elements to the {@link GlobalWindow}.
+   */
+  private static class IntoGlobalWindow implements AssertionWindows, Serializable {
+    public static AssertionWindows of() {
+      return new IntoGlobalWindow();
+    }
+
+    private <T> PTransform<PCollection<T>, PCollection<T>> window() {
+      return Window.into(new GlobalWindows());
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> windowDummy() {
+      return window();
+    }
+
+    /**
+     * Rewindows all input elements into the {@link GlobalWindow}. This ensures that the result
+     * PCollection will contain all of the elements of the PCollection when the window is not
+     * specified.
+     */
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals() {
+      return window();
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> windowActuals() {
+      return window();
+    }
+  }
+
+  private static class IntoStaticWindows implements AssertionWindows {
+    private final StaticWindows windowFn;
+
+    public static AssertionWindows of(Coder<BoundedWindow> windowCoder, BoundedWindow window) {
+      return new IntoStaticWindows(StaticWindows.of(windowCoder, window));
+    }
+
+    private IntoStaticWindows(StaticWindows windowFn) {
+      this.windowFn = windowFn;
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> windowDummy() {
+      return Window.into(windowFn);
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> prepareActuals() {
+      return new FilterWindows<>(windowFn);
+    }
+
+    @Override
+    public <T> PTransform<PCollection<T>, PCollection<T>> windowActuals() {
+      return Window.into(windowFn.intoOnlyExisting());
+    }
+  }
+
+  /**
+   * A DoFn that filters elements based on their presence in a static collection of windows.
+   */
+  private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private final StaticWindows windows;
+
+    public FilterWindows(StaticWindows windows) {
+      this.windows = windows;
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      return input.apply("FilterWindows", ParDo.of(new Fn()));
+    }
+
+    private class Fn extends DoFn<T, T> implements RequiresWindowAccess {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        if (windows.getWindows().contains(c.window())) {
+          c.output(c.element());
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f449881/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index fdc8719..bafd897 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -25,11 +27,21 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.collect.Iterables;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -148,6 +160,45 @@ public class PAssertTest implements Serializable {
   }
 
   /**
+   * A {@link PAssert} about the contents of a {@link PCollection}
+   * is allows to be verified by an arbitrary {@link SerializableFunction},
+   * though.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testWindowedSerializablePredicate() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+
+    PCollection<NotSerializableObject> pcollection = pipeline
+        .apply(Create.timestamped(
+            TimestampedValue.of(new NotSerializableObject(), new Instant(250L)),
+            TimestampedValue.of(new NotSerializableObject(), new Instant(500L)))
+            .withCoder(NotSerializableObjectCoder.of()))
+        .apply(Window.<NotSerializableObject>into(FixedWindows.of(Duration.millis(300L))));
+
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(0L), new Instant(300L)))
+        .satisfies(new SerializableFunction<Iterable<NotSerializableObject>, Void>() {
+          @Override
+          public Void apply(Iterable<NotSerializableObject> contents) {
+            assertThat(Iterables.isEmpty(contents), is(false));
+            return null; // no problem!
+          }
+        });
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(300L), new Instant(600L)))
+        .satisfies(new SerializableFunction<Iterable<NotSerializableObject>, Void>() {
+          @Override
+          public Void apply(Iterable<NotSerializableObject> contents) {
+            assertThat(Iterables.isEmpty(contents), is(false));
+            return null; // no problem!
+          }
+        });
+
+    pipeline.run();
+  }
+
+  /**
    * Test that we throw an error at pipeline construction time when the user mistakenly uses
    * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}.
    */
@@ -220,6 +271,26 @@ public class PAssertTest implements Serializable {
   }
 
   /**
+   * Basic test for {@code isEqualTo}.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testWindowedIsEqualTo() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> pcollection =
+        pipeline.apply(Create.timestamped(TimestampedValue.of(43, new Instant(250L)),
+            TimestampedValue.of(22, new Instant(-250L))))
+            .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(500L))));
+    PAssert.thatSingleton(pcollection)
+        .inWindow(new IntervalWindow(new Instant(0L), new Instant(500L)))
+        .isEqualTo(43);
+    PAssert.thatSingleton(pcollection)
+        .inWindow(new IntervalWindow(new Instant(-500L), new Instant(0L)))
+        .isEqualTo(22);
+    pipeline.run();
+  }
+
+  /**
    * Basic test for {@code notEqualTo}.
    */
   @Test
@@ -244,6 +315,51 @@ public class PAssertTest implements Serializable {
   }
 
   /**
+   * Tests that {@code containsInAnyOrder} is actually order-independent.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testGlobalWindowContainsInAnyOrder() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
+    PAssert.that(pcollection).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(2, 1, 4, 3);
+    pipeline.run();
+  }
+
+  /**
+   * Tests that windowed {@code containsInAnyOrder} is actually order-independent.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testWindowedContainsInAnyOrder() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> pcollection =
+        pipeline.apply(Create.timestamped(TimestampedValue.of(1, new Instant(100L)),
+            TimestampedValue.of(2, new Instant(200L)),
+            TimestampedValue.of(3, new Instant(300L)),
+            TimestampedValue.of(4, new Instant(400L))))
+            .apply(Window.<Integer>into(SlidingWindows.of(Duration.millis(200L))
+                .every(Duration.millis(100L))
+                .withOffset(Duration.millis(50L))));
+
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(-50L), new Instant(150L))).containsInAnyOrder(1);
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(50L), new Instant(250L)))
+        .containsInAnyOrder(2, 1);
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(150L), new Instant(350L)))
+        .containsInAnyOrder(2, 3);
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(250L), new Instant(450L)))
+        .containsInAnyOrder(4, 3);
+    PAssert.that(pcollection)
+        .inWindow(new IntervalWindow(new Instant(350L), new Instant(550L)))
+        .containsInAnyOrder(4);
+    pipeline.run();
+  }
+
+  /**
    * Tests that {@code containsInAnyOrder} fails when and how it should.
    */
   @Test


[3/5] incubator-beam git commit: Update test for GatherAllPanes

Posted by ke...@apache.org.
Update test for GatherAllPanes


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec1bb3a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec1bb3a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec1bb3a6

Branch: refs/heads/master
Commit: ec1bb3a62cb69fc221ed0370679fb91ffafebf66
Parents: a1365bb
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 20 14:39:54 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 10:46:48 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GatherAllPanesTest.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec1bb3a6/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
index e9be41e..a6522ef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 import com.google.common.collect.Iterables;
@@ -98,8 +100,12 @@ public class GatherAllPanesTest implements Serializable {
   public void multiplePanesMultipleReifiedPane() {
     TestPipeline p = TestPipeline.create();
 
+    PCollection<Long> someElems = p.apply("someLongs", CountingInput.upTo(20000));
+    PCollection<Long> otherElems = p.apply("otherLongs", CountingInput.upTo(20000));
     PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes =
-        p.apply(CountingInput.upTo(20000))
+        PCollectionList.of(someElems)
+            .and(otherElems)
+            .apply(Flatten.<Long>pCollections())
             .apply(
                 WithTimestamps.of(
                     new SerializableFunction<Long, Instant>() {


[5/5] incubator-beam git commit: This closes #518

Posted by ke...@apache.org.
This closes #518


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff72e27f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff72e27f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff72e27f

Branch: refs/heads/master
Commit: ff72e27fca7acaa2922caa290489536a855bef92
Parents: bd21ead 1f44988
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jun 24 14:23:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 24 14:23:54 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 369 +++++++++++++++----
 .../apache/beam/sdk/testing/StaticWindows.java  | 110 ++++++
 .../apache/beam/sdk/testing/WindowSupplier.java |  83 +++++
 .../apache/beam/sdk/util/GatherAllPanes.java    |  12 +-
 .../apache/beam/sdk/testing/PAssertTest.java    | 116 ++++++
 .../beam/sdk/testing/StaticWindowsTest.java     |  94 +++++
 .../beam/sdk/testing/WindowSupplierTest.java    |  89 +++++
 .../beam/sdk/util/GatherAllPanesTest.java       |   8 +-
 8 files changed, 808 insertions(+), 73 deletions(-)
----------------------------------------------------------------------



[2/5] incubator-beam git commit: Key with integers in GatherAllPanes

Posted by ke...@apache.org.
Key with integers in GatherAllPanes

Ensures that runners which do not support null values can handle
GatherAllPanes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1365bb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1365bb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1365bb2

Branch: refs/heads/master
Commit: a1365bb2ca9f75d607e594a20810f53ef9232f9d
Parents: a2ab828
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 15:19:26 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 10:46:45 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GatherAllPanes.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1365bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index ab40678..0f2ecd0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -57,15 +57,17 @@ public class GatherAllPanes<T>
     WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
 
     return input
-        .apply(WithKeys.<Void, T>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
-        .apply(new ReifyTimestampsAndWindows<Void, T>())
+        .apply(WithKeys.<Integer, T>of(0).withKeyType(new TypeDescriptor<Integer>() {}))
+        .apply(new ReifyTimestampsAndWindows<Integer, T>())
         .apply(
             Window.into(
-                    new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
+                    new IdentityWindowFn<KV<Integer, WindowedValue<T>>>(
                         originalWindowFn.windowCoder()))
-                .triggering(Never.ever()))
+                .triggering(Never.ever())
+                .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+                .discardingFiredPanes())
         // all values have the same key so they all appear as a single output element
-        .apply(GroupByKey.<Void, WindowedValue<T>>create())
+        .apply(GroupByKey.<Integer, WindowedValue<T>>create())
         .apply(Values.<Iterable<WindowedValue<T>>>create())
         .setWindowingStrategyInternal(input.getWindowingStrategy());
   }