You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/09 21:58:19 UTC

[1/4] incubator-beam git commit: Spark runner: Assign windows when re-windowing into global window

Repository: incubator-beam
Updated Branches:
  refs/heads/master d6adbbf96 -> d53e96a0d


Spark runner: Assign windows when re-windowing into global window

Previously, window assignment was elided when the window was the
global window. But when the source windows are not the global window,
this elision is not correct. Now window assignment is run except
when both the source *and* the destination window are the global window
(which remains a common case in globally windowed batch tests
using PAssert).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d53e96a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d53e96a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d53e96a0

Branch: refs/heads/master
Commit: d53e96a0d1f5f26ad0e3efc90dc9f7b53135443b
Parents: f222df1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 9 09:15:39 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 9 14:41:09 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/translation/TransformTranslator.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d53e96a0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b462d35..ebceb6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -699,13 +699,16 @@ public final class TransformTranslator {
         JavaRDDLike<WindowedValue<T>, ?> inRDD =
             (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
         WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
-        if (windowFn instanceof GlobalWindows) {
+        // Avoid running assign windows if both source and destination are global window
+        if (context.getInput(transform).getWindowingStrategy().getWindowFn()
+                instanceof GlobalWindows
+            && windowFn instanceof GlobalWindows) {
           context.setOutputRDD(transform, inRDD);
         } else {
           @SuppressWarnings("unchecked")
           DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
           DoFnFunction<T, T> dofn =
-                  new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
+              new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
           context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
         }
       }


[3/4] incubator-beam git commit: Switch Spark streaming tests to custom assertions

Posted by ke...@apache.org.
Switch Spark streaming tests to custom assertions

The current use of PAssert in the streaming tests for
the Spark runner work via coincidence. PAssert does not
truly support non-global windowing. The switch from
side inputs to GBK, with no change in semantics but hopefully
an easier on-ramp for new runners, incidentally broke
these tests. Soon, PAssert will support windowing, triggers,
and unbounded PCollections. Until then, this change
writes a slightly custom assertion transform for these tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f222df10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f222df10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f222df10

Branch: refs/heads/master
Commit: f222df109e773f23e56f9e830454356893989a15
Parents: 77aa093
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 8 18:11:07 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 9 14:41:09 2016 -0700

----------------------------------------------------------------------
 .../streaming/FlattenStreamingTest.java         |  7 +---
 .../streaming/KafkaStreamingTest.java           | 13 ++-----
 .../streaming/SimpleStreamingWordCountTest.java | 18 +++-------
 .../streaming/utils/PAssertStreaming.java       | 36 +++++++++++++++++++-
 4 files changed, 43 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 15b2f39..976c7c2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -25,9 +25,7 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
@@ -77,13 +75,10 @@ public class FlattenStreamingTest {
     PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
     PCollection<String> union = list.apply(Flatten.<String>pCollections());
 
-    PAssert.thatIterable(union.apply(View.<String>asIterable()))
-            .containsInAnyOrder(EXPECTED_UNION);
+    PAssertStreaming.assertContents(union, EXPECTED_UNION);
 
     EvaluationResult res = SparkPipelineRunner.create(options).run(p);
     res.close();
-
-    PAssertStreaming.assertNoFailures(res);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index fd75e74..53293fb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -27,17 +27,14 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -52,7 +49,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import kafka.serializer.StringDecoder;
 /**
@@ -68,9 +64,7 @@ public class KafkaStreamingTest {
   private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
       "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
   );
-  private static final Set<String> EXPECTED = ImmutableSet.of(
-      "k1,v1", "k2,v2", "k3,v3", "k4,v4"
-  );
+  private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
   private static final long TEST_TIMEOUT_MSEC = 1000L;
 
   @BeforeClass
@@ -116,13 +110,10 @@ public class KafkaStreamingTest {
 
     PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn()));
 
-    PAssert.thatIterable(formattedKV.apply(View.<String>asIterable()))
-        .containsInAnyOrder(EXPECTED);
+    PAssertStreaming.assertContents(formattedKV, EXPECTED);
 
     EvaluationResult res = SparkPipelineRunner.create(options).run(p);
     res.close();
-
-    PAssertStreaming.assertNoFailures(res);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 28133ca..6dc9a08 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
+
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SimpleWordCountTest;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
@@ -26,33 +27,28 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 
-import com.google.common.collect.ImmutableSet;
-
 import org.joda.time.Duration;
 import org.junit.Test;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 
 /**
  * Simple word count streaming test.
  */
-public class SimpleStreamingWordCountTest {
+public class SimpleStreamingWordCountTest implements Serializable {
 
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
   private static final List<Iterable<String>> WORDS_QUEUE =
       Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY));
-  private static final Set<String> EXPECTED_COUNT_SET =
-      ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+  private static final String[] EXPECTED_COUNTS = {"hi: 5", "there: 1", "sue: 2", "bob: 2"};
   private static final long TEST_TIMEOUT_MSEC = 1000L;
 
   @Test
@@ -71,12 +67,8 @@ public class SimpleStreamingWordCountTest {
 
     PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
 
-    PAssert.thatIterable(output.apply(View.<String>asIterable()))
-        .containsInAnyOrder(EXPECTED_COUNT_SET);
-
+    PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
     EvaluationResult res = SparkPipelineRunner.create(options).run(p);
     res.close();
-
-    PAssertStreaming.assertNoFailures(res);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 3d8fc32..f85c440 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -17,15 +17,26 @@
  */
 package org.apache.beam.runners.spark.translation.streaming.utils;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
 import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.PCollection;
 
 import org.junit.Assert;
 
+import java.io.Serializable;
+
 /**
  * Since PAssert doesn't propagate assert exceptions, use Aggregators to assert streaming
  * success/failure counters.
  */
-public final class PAssertStreaming {
+public final class PAssertStreaming implements Serializable {
 
   /**
    * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}.
@@ -40,4 +51,27 @@ public final class PAssertStreaming {
     int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class);
     Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures);
   }
+
+  /**
+   * Adds a pipeline run-time assertion that the contents of {@code actual} are {@code expected}.
+   * Note that it is oblivious to windowing, so the assertion will apply indiscriminately to all
+   * windows.
+   */
+  public static <T> void assertContents(PCollection<T> actual, final T[] expected) {
+    // Because PAssert does not support non-global windowing, but all our data is in one window,
+    // we set up the assertion directly.
+    actual
+        .apply(WithKeys.<String, T>of("dummy"))
+        .apply(GroupByKey.<String, T>create())
+        .apply(Values.<Iterable<T>>create())
+        .apply(
+            MapElements.via(
+                new SimpleFunction<Iterable<T>, Void>() {
+                  @Override
+                  public Void apply(Iterable<T> input) {
+                    assertThat(input, containsInAnyOrder(expected));
+                    return null;
+                  }
+                }));
+  }
 }


[4/4] incubator-beam git commit: Base PAssert on GBK instead of side inputs

Posted by ke...@apache.org.
Base PAssert on GBK instead of side inputs

Previously PAssert - hence all RunnableOnService/NeedsRunner
tests - required side input support. This created a very steep
on ramp for new runners.

GroupByKey is a bit more fundamental and most backends will be
able to group by key in the global window very quickly. So switching
the primitive used to gather all the contents of a PCollection for
assertions should make it a bit easier to get early feedback during
runner development.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/810ffeb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/810ffeb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/810ffeb2

Branch: refs/heads/master
Commit: 810ffeb2785bf996001c8fadb992410d1f9409c6
Parents: d6adbbf
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 8 15:07:52 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 9 14:41:09 2016 -0700

----------------------------------------------------------------------
 .../testing/TestDataflowPipelineRunner.java     |   3 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 737 +++++++++----------
 .../apache/beam/sdk/testing/PAssertTest.java    |  27 -
 3 files changed, 362 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
index 3e8d903..c940e9a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
@@ -166,7 +166,8 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
     if (transform instanceof PAssert.OneSideInputAssert
-        || transform instanceof PAssert.TwoSideInputAssert) {
+        || transform instanceof PAssert.GroupThenAssert
+        || transform instanceof PAssert.GroupThenAssertForSingleton) {
       expectedNumberOfAssertions += 1;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index c2cd598..b10c1cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -34,11 +34,14 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -48,32 +51,27 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
 /**
- * An assertion on the contents of a {@link PCollection}
- * incorporated into the pipeline.  Such an assertion
- * can be checked no matter what kind of {@link PipelineRunner} is
- * used.
+ * An assertion on the contents of a {@link PCollection} incorporated into the pipeline. Such an
+ * assertion can be checked no matter what kind of {@link PipelineRunner} is used.
  *
- * <p>Note that the {@code PAssert} call must precede the call
- * to {@link Pipeline#run}.
+ * <p>Note that the {@code PAssert} call must precede the call to {@link Pipeline#run}.
  *
- * <p>Examples of use:
- * <pre>{@code
+ * <p>Examples of use: <pre>{@code
  * Pipeline p = TestPipeline.create();
  * ...
  * PCollection<String> output =
@@ -107,30 +105,84 @@ public class PAssert {
   private PAssert() {}
 
   /**
-   * Constructs an {@link IterableAssert} for the elements of the provided
-   * {@link PCollection}.
+   * Builder interface for assertions applicable to iterables and PCollection contents.
+   */
+  public interface IterableAssert<T> {
+
+    /**
+     * Asserts that the iterable in question contains the provided elements.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert<T> containsInAnyOrder(T... expectedElements);
+
+    /**
+     * Asserts that the iterable in question contains the provided elements.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements);
+
+    /**
+     * Asserts that the iterable in question is empty.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert<T> empty();
+
+    /**
+     * Applies the provided checking function (presumably containing assertions) to the
+     * iterable in question.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn);
+  }
+
+  /**
+   * Builder interface for assertions applicable to a single value.
+   */
+  public interface SingletonAssert<T> {
+    /**
+     * Asserts that the value in question is equal to the provided value, according to
+     * {@link Object#equals}.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert<T> isEqualTo(T expected);
+
+    /**
+     * Asserts that the value in question is not equal to the provided value, according
+     * to {@link Object#equals}.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert<T> notEqualTo(T notExpected);
+
+    /**
+     * Applies the provided checking function (presumably containing assertions) to the
+     * value in question.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn);
+  }
+
+  /**
+   * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}.
    */
   public static <T> IterableAssert<T> that(PCollection<T> actual) {
-    return new IterableAssert<>(
-        new CreateActual<T, Iterable<T>>(actual, View.<T>asIterable()),
-         actual.getPipeline())
-         .setCoder(actual.getCoder());
+    return new PCollectionContentsAssert<>(actual);
   }
 
   /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@link PCollection} which must contain a single {@code Iterable<T>}
-   * value.
+   * Constructs an {@link IterableAssert} for the value of the provided {@link PCollection} which
+   * must contain a single {@code Iterable<T>} value.
    */
-  public static <T> IterableAssert<T>
-      thatSingletonIterable(PCollection<? extends Iterable<T>> actual) {
+  public static <T> IterableAssert<T> thatSingletonIterable(
+      PCollection<? extends Iterable<T>> actual) {
 
-    List<? extends Coder<?>> maybeElementCoder = actual.getCoder().getCoderArguments();
-    Coder<T> tCoder;
     try {
-      @SuppressWarnings("unchecked")
-      Coder<T> tCoderTmp = (Coder<T>) Iterables.getOnlyElement(maybeElementCoder);
-      tCoder = tCoderTmp;
     } catch (NoSuchElementException | IllegalArgumentException exc) {
       throw new IllegalArgumentException(
           "PAssert.<T>thatSingletonIterable requires a PCollection<Iterable<T>>"
@@ -141,19 +193,7 @@ public class PAssert {
     @SuppressWarnings("unchecked") // Safe covariant cast
     PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual;
 
-    return new IterableAssert<>(
-        new CreateActual<Iterable<T>, Iterable<T>>(
-            actualIterables, View.<Iterable<T>>asSingleton()),
-        actual.getPipeline())
-        .setCoder(tCoder);
-  }
-
-  /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@code PCollectionView PCollectionView<Iterable<T>>}.
-   */
-  public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> actual) {
-    return new IterableAssert<>(new PreExisting<Iterable<T>>(actual), actual.getPipeline());
+    return new PCollectionSingletonIterableAssert<>(actualIterables);
   }
 
   /**
@@ -161,93 +201,95 @@ public class PAssert {
    * {@code PCollection PCollection<T>}, which must be a singleton.
    */
   public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) {
-    return new SingletonAssert<>(
-        new CreateActual<T, T>(actual, View.<T>asSingleton()), actual.getPipeline())
-        .setCoder(actual.getCoder());
+    return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder());
   }
 
   /**
    * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}.
    *
-   * <p>Note that the actual value must be coded by a {@link KvCoder},
-   * not just any {@code Coder<K, V>}.
+   * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
+   * {@code Coder<K, V>}.
    */
-  public static <K, V> SingletonAssert<Map<K, Iterable<V>>>
-      thatMultimap(PCollection<KV<K, V>> actual) {
+  public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap(
+      PCollection<KV<K, V>> actual) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-
-    return new SingletonAssert<>(
-        new CreateActual<>(actual, View.<K, V>asMultimap()), actual.getPipeline())
-        .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
+    return new PCollectionViewAssert<>(
+        actual,
+        View.<K, V>asMultimap(),
+        MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
   }
 
   /**
-   * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection},
-   * which must have at most one value per key.
+   * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which
+   * must have at most one value per key.
    *
-   * <p>Note that the actual value must be coded by a {@link KvCoder},
-   * not just any {@code Coder<K, V>}.
+   * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
+   * {@code Coder<K, V>}.
    */
   public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-
-    return new SingletonAssert<>(
-        new CreateActual<>(actual, View.<K, V>asMap()), actual.getPipeline())
-        .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
+    return new PCollectionViewAssert<>(
+        actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
   }
 
   ////////////////////////////////////////////////////////////
 
   /**
-   * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable<T>}.
+   * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require
+   * the runner to support side inputs.
    */
-  public static class IterableAssert<T> implements Serializable {
-    private final Pipeline pipeline;
-    private final PTransform<PBegin, PCollectionView<Iterable<T>>> createActual;
-    private Optional<Coder<T>> coder;
+  private static class PCollectionContentsAssert<T> implements IterableAssert<T> {
+    private final PCollection<T> actual;
 
-    protected IterableAssert(
-        PTransform<PBegin, PCollectionView<Iterable<T>>> createActual, Pipeline pipeline) {
-      this.createActual = createActual;
-      this.pipeline = pipeline;
-      this.coder = Optional.absent();
+    public PCollectionContentsAssert(PCollection<T> actual) {
+      this.actual = actual;
     }
 
     /**
-     * Sets the coder to use for elements of type {@code T}, as needed for internal purposes.
+     * Checks that the {@code Iterable} contains the expected elements, in any order.
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    public IterableAssert<T> setCoder(Coder<T> coderOrNull) {
-      this.coder = Optional.fromNullable(coderOrNull);
-      return this;
+    @Override
+    @SafeVarargs
+    public final PCollectionContentsAssert<T> containsInAnyOrder(T... expectedElements) {
+      return containsInAnyOrder(Arrays.asList(expectedElements));
     }
 
     /**
-     * Gets the coder, which may yet be absent.
+     * Checks that the {@code Iterable} contains the expected elements, in any order.
+     *
+     * <p>Returns this {@code IterableAssert}.
      */
-    public Coder<T> getCoder() {
-      if (coder.isPresent()) {
-        return coder.get();
-      } else {
-        throw new IllegalStateException(
-            "Attempting to access the coder of an IterableAssert"
-                + " that has not been set yet.");
-      }
+    @Override
+    public PCollectionContentsAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
+      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
+    }
+
+    @Override
+    public PCollectionContentsAssert<T> empty() {
+      return containsInAnyOrder(Collections.<T>emptyList());
+    }
+
+    @Override
+    public PCollectionContentsAssert<T> satisfies(
+        SerializableFunction<Iterable<T>, Void> checkerFn) {
+      actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
+      return this;
     }
 
     /**
-     * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}.
+     * Checks that the {@code Iterable} contains elements that match the provided matchers, in any
+     * order.
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    public IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn) {
-      pipeline.apply(
-          "PAssert$" + (assertCount++),
-          new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
-      return this;
+    @SafeVarargs
+    final PCollectionContentsAssert<T> containsInAnyOrder(
+        SerializableMatcher<? super T>... elementMatchers) {
+      return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
     }
 
     /**
@@ -255,17 +297,11 @@ public class PAssert {
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    public IterableAssert<T> satisfies(
-        AssertRelation<Iterable<T>, Iterable<T>> relation,
-        final Iterable<T> expectedElements) {
-      pipeline.apply(
-          "PAssert$" + (assertCount++),
-          new TwoSideInputAssert<Iterable<T>, Iterable<T>>(
-              createActual,
-              new CreateExpected<T, Iterable<T>>(expectedElements, coder, View.<T>asIterable()),
-              relation));
-
-      return this;
+    private PCollectionContentsAssert<T> satisfies(
+        AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) {
+      return satisfies(
+          new CheckRelationAgainstExpected<Iterable<T>>(
+              relation, expectedElements, IterableCoder.of(actual.getCoder())));
     }
 
     /**
@@ -273,15 +309,14 @@ public class PAssert {
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    IterableAssert<T> satisfies(final SerializableMatcher<Iterable<? extends T>> matcher) {
+    PCollectionContentsAssert<T> satisfies(
+        final SerializableMatcher<Iterable<? extends T>> matcher) {
       // Safe covariant cast. Could be elided by changing a lot of this file to use
       // more flexible bounds.
       @SuppressWarnings({"rawtypes", "unchecked"})
       SerializableFunction<Iterable<T>, Void> checkerFn =
-        (SerializableFunction) new MatcherCheckerFn<>(matcher);
-      pipeline.apply(
-          "PAssert$" + (assertCount++),
-          new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
+          (SerializableFunction) new MatcherCheckerFn<>(matcher);
+      actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
       return this;
     }
 
@@ -300,19 +335,9 @@ public class PAssert {
     }
 
     /**
-     * Checks that the {@code Iterable} is empty.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> empty() {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(), Collections.<T>emptyList());
-    }
-
-    /**
      * @throws UnsupportedOperationException always
-     * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects.
-     *    If you meant to test object equality, use a variant of {@link #containsInAnyOrder}
-     *    instead.
+     * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If you meant
+     * to test object equality, use a variant of {@link #containsInAnyOrder} instead.
      */
     @Deprecated
     @Override
@@ -331,169 +356,129 @@ public class PAssert {
       throw new UnsupportedOperationException(
           String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName()));
     }
+  }
 
-    /**
-     * Checks that the {@code Iterable} contains the expected elements, in any
-     * order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
-    }
+  /**
+   * An {@link IterableAssert} for an iterable that is the sole element of a {@link PCollection}.
+   * This does not require the runner to support side inputs.
+   */
+  private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> {
+    private final PCollection<Iterable<T>> actual;
+    private final Coder<T> elementCoder;
 
-    /**
-     * Checks that the {@code Iterable} contains the expected elements, in any
-     * order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    @SafeVarargs
-    public final IterableAssert<T> containsInAnyOrder(T... expectedElements) {
-      return satisfies(
-        new AssertContainsInAnyOrderRelation<T>(),
-        Arrays.asList(expectedElements));
+    public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) {
+      this.actual = actual;
+
+      @SuppressWarnings("unchecked")
+      Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0);
+      this.elementCoder = typedCoder;
     }
 
-    /**
-     * Checks that the {@code Iterable} contains elements that match the provided matchers,
-     * in any order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
+    @Override
     @SafeVarargs
-    final IterableAssert<T> containsInAnyOrder(
-        SerializableMatcher<? super T>... elementMatchers) {
-      return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
+    public final PCollectionSingletonIterableAssert<T> containsInAnyOrder(T... expectedElements) {
+      return containsInAnyOrder(Arrays.asList(expectedElements));
     }
-  }
 
-  /**
-   * An assertion about the single value of type {@code T}
-   * associated with a {@link PCollectionView}.
-   */
-  public static class SingletonAssert<T> implements Serializable {
-    private final Pipeline pipeline;
-    private final CreateActual<?, T> createActual;
-    private Optional<Coder<T>> coder;
-
-    protected SingletonAssert(
-        CreateActual<?, T> createActual, Pipeline pipeline) {
-      this.pipeline = pipeline;
-      this.createActual = createActual;
-      this.coder = Optional.absent();
+    @Override
+    public PCollectionSingletonIterableAssert<T> empty() {
+      return containsInAnyOrder(Collections.<T>emptyList());
     }
 
-    /**
-     * Always throws an {@link UnsupportedOperationException}: users are probably looking for
-     * {@link #isEqualTo}.
-     */
-    @Deprecated
     @Override
-    public boolean equals(Object o) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "tests for Java equality of the %s object, not the PCollection in question. "
-                  + "Call a test method, such as isEqualTo.",
-              getClass().getSimpleName()));
+    public PCollectionSingletonIterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
+      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
     }
 
-    /**
-     * @throws UnsupportedOperationException always.
-     * @deprecated {@link Object#hashCode()} is not supported on PAssert objects.
-     */
-    @Deprecated
     @Override
-    public int hashCode() {
-      throw new UnsupportedOperationException(
-          String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
+    public PCollectionSingletonIterableAssert<T> satisfies(
+        SerializableFunction<Iterable<T>, Void> checkerFn) {
+      actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn));
+      return this;
     }
 
-    /**
-     * Sets the coder to use for elements of type {@code T}, as needed
-     * for internal purposes.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public SingletonAssert<T> setCoder(Coder<T> coderOrNull) {
-      this.coder = Optional.fromNullable(coderOrNull);
-      return this;
+    private PCollectionSingletonIterableAssert<T> satisfies(
+        AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) {
+      return satisfies(
+          new CheckRelationAgainstExpected<Iterable<T>>(
+              relation, expectedElements, IterableCoder.of(elementCoder)));
     }
+  }
 
-    /**
-     * Gets the coder, which may yet be absent.
-     */
-    public Coder<T> getCoder() {
-      if (coder.isPresent()) {
-        return coder.get();
-      } else {
-        throw new IllegalStateException(
-            "Attempting to access the coder of an IterableAssert that has not been set yet.");
-      }
+  /**
+   * An assertion about the contents of a {@link PCollection} when it is viewed as a single value
+   * of type {@code ViewT}. This requires side input support from the runner.
+   */
+  private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> {
+    private final PCollection<ElemT> actual;
+    private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
+    private final Coder<ViewT> coder;
+
+    protected PCollectionViewAssert(
+        PCollection<ElemT> actual,
+        PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
+        Coder<ViewT> coder) {
+      this.actual = actual;
+      this.view = view;
+      this.coder = coder;
     }
 
-    /**
-     * Applies a {@link SerializableFunction} to check the value of this
-     * {@code SingletonAssert}'s view.
-     *
-     * <p>Returns this {@code SingletonAssert}.
-     */
-    public SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) {
-      pipeline.apply(
-          "PAssert$" + (assertCount++), new OneSideInputAssert<T>(createActual, checkerFn));
-      return this;
+    @Override
+    public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
+      return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
     }
 
-    /**
-     * Applies an {@link AssertRelation} to check the provided relation against the
-     * value of this assert and the provided expected value.
-     *
-     * <p>Returns this {@code SingletonAssert}.
-     */
-    public SingletonAssert<T> satisfies(
-        AssertRelation<T, T> relation,
-        final T expectedValue) {
-      pipeline.apply(
-          "PAssert$" + (assertCount++),
-          new TwoSideInputAssert<T, T>(
-              createActual,
-              new CreateExpected<T, T>(Arrays.asList(expectedValue), coder, View.<T>asSingleton()),
-              relation));
+    @Override
+    public PCollectionViewAssert<ElemT, ViewT> notEqualTo(ViewT expectedValue) {
+      return satisfies(new AssertNotEqualToRelation<ViewT>(), expectedValue);
+    }
 
+    @Override
+    public PCollectionViewAssert<ElemT, ViewT> satisfies(
+        SerializableFunction<ViewT, Void> checkerFn) {
+      actual
+          .getPipeline()
+          .apply(
+              "PAssert$" + (assertCount++),
+              new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn));
       return this;
     }
 
     /**
-     * Checks that the value of this {@code SingletonAssert}'s view is equal
-     * to the expected value.
+     * Applies an {@link AssertRelation} to check the provided relation against the value of this
+     * assert and the provided expected value.
      *
      * <p>Returns this {@code SingletonAssert}.
      */
-    public SingletonAssert<T> isEqualTo(T expectedValue) {
-      return satisfies(new AssertIsEqualToRelation<T>(), expectedValue);
+    private PCollectionViewAssert<ElemT, ViewT> satisfies(
+        AssertRelation<ViewT, ViewT> relation, final ViewT expectedValue) {
+      return satisfies(new CheckRelationAgainstExpected<ViewT>(relation, expectedValue, coder));
     }
 
     /**
-     * Checks that the value of this {@code SingletonAssert}'s view is not equal
-     * to the expected value.
-     *
-     * <p>Returns this {@code SingletonAssert}.
+     * Always throws an {@link UnsupportedOperationException}: users are probably looking for
+     * {@link #isEqualTo}.
      */
-    public SingletonAssert<T> notEqualTo(T expectedValue) {
-      return satisfies(new AssertNotEqualToRelation<T>(), expectedValue);
+    @Deprecated
+    @Override
+    public boolean equals(Object o) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "tests for Java equality of the %s object, not the PCollection in question. "
+                  + "Call a test method, such as isEqualTo.",
+              getClass().getSimpleName()));
     }
 
     /**
-     * Checks that the value of this {@code SingletonAssert}'s view is equal to
-     * the expected value.
-     *
-     * @deprecated replaced by {@link #isEqualTo}
+     * @throws UnsupportedOperationException always.
+     * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects.
      */
     @Deprecated
-    public SingletonAssert<T> is(T expectedValue) {
-      return isEqualTo(expectedValue);
+    @Override
+    public int hashCode() {
+      throw new UnsupportedOperationException(
+          String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
     }
-
   }
 
   ////////////////////////////////////////////////////////////////////////
@@ -504,8 +489,13 @@ public class PAssert {
     private final transient PCollection<T> actual;
     private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
 
-    private CreateActual(PCollection<T> actual,
-        PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+    public static <T, ActualT> CreateActual<T, ActualT> from(
+        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+      return new CreateActual<>(actual, actualView);
+    }
+
+    private CreateActual(
+        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
       this.actual = actual;
       this.actualView = actualView;
     }
@@ -515,73 +505,145 @@ public class PAssert {
       final Coder<T> coder = actual.getCoder();
       return actual
           .apply(Window.<T>into(new GlobalWindows()))
-          .apply(ParDo.of(new DoFn<T, T>() {
-            @Override
-            public void processElement(ProcessContext context) throws CoderException {
-              context.output(CoderUtils.clone(coder, context.element()));
-            }
-          }))
+          .apply(
+              ParDo.of(
+                  new DoFn<T, T>() {
+                    @Override
+                    public void processElement(ProcessContext context) throws CoderException {
+                      context.output(CoderUtils.clone(coder, context.element()));
+                    }
+                  }))
           .apply(actualView);
     }
   }
 
-  private static class CreateExpected<T, ExpectedT>
-      extends PTransform<PBegin, PCollectionView<ExpectedT>> {
-
-    private final Iterable<T> elements;
-    private final Optional<Coder<T>> coder;
-    private final transient PTransform<PCollection<T>, PCollectionView<ExpectedT>> view;
+  /**
+   * A partially applied {@link AssertRelation}, where one value is provided along with a coder to
+   * serialize/deserialize them.
+   */
+  private static class CheckRelationAgainstExpected<T> implements SerializableFunction<T, Void> {
+    private final AssertRelation<T, T> relation;
+    private final byte[] encodedExpected;
+    private final Coder<T> coder;
 
-    private CreateExpected(Iterable<T> elements, Optional<Coder<T>> coder,
-        PTransform<PCollection<T>, PCollectionView<ExpectedT>> view) {
-      this.elements = elements;
+    public CheckRelationAgainstExpected(AssertRelation<T, T> relation, T expected, Coder<T> coder) {
+      this.relation = relation;
       this.coder = coder;
-      this.view = view;
+
+      try {
+        this.encodedExpected = CoderUtils.encodeToByteArray(coder, expected);
+      } catch (IOException coderException) {
+        throw new RuntimeException(coderException);
+      }
     }
 
     @Override
-    public PCollectionView<ExpectedT> apply(PBegin input) {
-      Create.Values<T> createTransform = Create.<T>of(elements);
-      if (coder.isPresent()) {
-        createTransform = createTransform.withCoder(coder.get());
+    public Void apply(T actual) {
+      try {
+        T expected = CoderUtils.decodeFromByteArray(coder, encodedExpected);
+        return relation.assertFor(expected).apply(actual);
+      } catch (IOException coderException) {
+        throw new RuntimeException(coderException);
       }
-      return input.apply(createTransform).apply(view);
     }
   }
 
-  private static class PreExisting<T> extends PTransform<PBegin, PCollectionView<T>> {
+  /**
+   * A transform that gathers the contents of a {@link PCollection} into a single main input
+   * iterable in the global window. This requires a runner to support {@link GroupByKey} in the
+   * global window, but not side inputs or other windowing or triggers.
+   */
+  private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>>
+      implements Serializable {
 
-    private final PCollectionView<T> view;
+    public GroupGlobally() {}
 
-    private PreExisting(PCollectionView<T> view) {
-      this.view = view;
+    @Override
+    public PCollection<Iterable<T>> apply(PCollection<T> input) {
+      return input
+          .apply("GloballyWindow", Window.<T>into(new GlobalWindows()))
+          .apply("DummyKey", WithKeys.<Integer, T>of(0))
+          .apply("GroupByKey", GroupByKey.<Integer, T>create())
+          .apply("GetOnlyValue", Values.<Iterable<T>>create());
+    }
+  }
+
+  /**
+   * A transform that applies an assertion-checking function over iterables of {@code ActualT} to
+   * the entirety of the contents of its input.
+   */
+  public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone>
+      implements Serializable {
+    private final SerializableFunction<Iterable<T>, Void> checkerFn;
+
+    private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) {
+      this.checkerFn = checkerFn;
     }
 
     @Override
-    public PCollectionView<T> apply(PBegin input) {
-      return view;
+    public PDone apply(PCollection<T> input) {
+      input
+          .apply("GroupGlobally", new GroupGlobally<T>())
+          .apply(
+              "RunChecks",
+              ParDo.of(
+                  new DoFn<Iterable<T>, Void>() {
+                    @Override
+                    public void processElement(ProcessContext context) {
+                      checkerFn.apply(context.element());
+                    }
+                  }));
+
+      return PDone.in(input.getPipeline());
     }
   }
 
   /**
-   * An assertion checker that takes a single
-   * {@link PCollectionView PCollectionView&lt;ActualT&gt;}
-   * and an assertion over {@code ActualT}, and checks it within a dataflow
-   * pipeline.
+   * A transform that applies an assertion-checking function to a single iterable contained as the
+   * sole element of a {@link PCollection}.
+   */
+  public static class GroupThenAssertForSingleton<T>
+      extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable {
+    private final SerializableFunction<Iterable<T>, Void> checkerFn;
+
+    private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) {
+      this.checkerFn = checkerFn;
+    }
+
+    @Override
+    public PDone apply(PCollection<Iterable<T>> input) {
+      input
+          .apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
+          .apply(
+              "RunChecks",
+              ParDo.of(
+                  new DoFn<Iterable<Iterable<T>>, Void>() {
+                    @Override
+                    public void processElement(ProcessContext context) {
+                      checkerFn.apply(Iterables.getOnlyElement(context.element()));
+                    }
+                  }));
+
+      return PDone.in(input.getPipeline());
+    }
+  }
+
+  /**
+   * An assertion checker that takes a single {@link PCollectionView
+   * PCollectionView&lt;ActualT&gt;} and an assertion over {@code ActualT}, and checks it within a
+   * Beam pipeline.
    *
-   * <p>Note that the entire assertion must be serializable. If
-   * you need to make assertions involving multiple inputs
-   * that are each not serializable, use TwoSideInputAssert.
+   * <p>Note that the entire assertion must be serializable.
    *
-   * <p>This is generally useful for assertion functions that
-   * are serializable but whose underlying data may not have a coder.
+   * <p>This is generally useful for assertion functions that are serializable but whose underlying
+   * data may not have a coder.
    */
-  public static class OneSideInputAssert<ActualT>
-      extends PTransform<PBegin, PDone> implements Serializable {
+  public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone>
+      implements Serializable {
     private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
     private final SerializableFunction<ActualT, Void> checkerFn;
 
-    public OneSideInputAssert(
+    private OneSideInputAssert(
         PTransform<PBegin, PCollectionView<ActualT>> createActual,
         SerializableFunction<ActualT, Void> checkerFn) {
       this.createActual = createActual;
@@ -594,16 +656,18 @@ public class PAssert {
 
       input
           .apply(Create.of(0).withCoder(VarIntCoder.of()))
-          .apply(ParDo.named("RunChecks").withSideInputs(actual)
-              .of(new CheckerDoFn<>(checkerFn, actual)));
+          .apply(
+              ParDo.named("RunChecks")
+                  .withSideInputs(actual)
+                  .of(new CheckerDoFn<>(checkerFn, actual)));
 
       return PDone.in(input.getPipeline());
     }
   }
 
   /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
-   * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
+   * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
    * null values.
@@ -617,8 +681,7 @@ public class PAssert {
     private final PCollectionView<ActualT> actual;
 
     private CheckerDoFn(
-        SerializableFunction<ActualT, Void> checkerFn,
-        PCollectionView<ActualT> actual) {
+        SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) {
       this.checkerFn = checkerFn;
       this.actual = actual;
     }
@@ -640,88 +703,11 @@ public class PAssert {
     }
   }
 
-  /**
-   * An assertion checker that takes a {@link PCollectionView PCollectionView&lt;ActualT&gt;},
-   * a {@link PCollectionView PCollectionView&lt;ExpectedT&gt;}, a relation
-   * over {@code A} and {@code B}, and checks that the relation holds
-   * within a dataflow pipeline.
-   *
-   * <p>This is useful when either/both of {@code A} and {@code B}
-   * are not serializable, but have coders (provided
-   * by the underlying {@link PCollection}s).
-   */
-  public static class TwoSideInputAssert<ActualT, ExpectedT>
-      extends PTransform<PBegin, PDone> implements Serializable {
-
-    private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
-    private final transient PTransform<PBegin, PCollectionView<ExpectedT>> createExpected;
-    private final AssertRelation<ActualT, ExpectedT> relation;
-
-    protected TwoSideInputAssert(
-        PTransform<PBegin, PCollectionView<ActualT>> createActual,
-        PTransform<PBegin, PCollectionView<ExpectedT>> createExpected,
-        AssertRelation<ActualT, ExpectedT> relation) {
-      this.createActual = createActual;
-      this.createExpected = createExpected;
-      this.relation = relation;
-    }
-
-    @Override
-    public PDone apply(PBegin input) {
-      final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
-      final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected);
-
-      input
-          .apply(Create.of(0).withCoder(VarIntCoder.of()))
-          .apply("RunChecks", ParDo.withSideInputs(actual, expected)
-              .of(new CheckerDoFn<>(relation, actual, expected)));
-
-      return PDone.in(input.getPipeline());
-    }
-
-    /**
-     * Input is ignored, but is {@link Integer} for runners that do not support null values.
-     */
-    private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Integer, Void> {
-      private final Aggregator<Integer, Integer> success =
-          createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
-      private final Aggregator<Integer, Integer> failure =
-          createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
-      private final AssertRelation<ActualT, ExpectedT> relation;
-      private final PCollectionView<ActualT> actual;
-      private final PCollectionView<ExpectedT> expected;
-
-      private CheckerDoFn(AssertRelation<ActualT, ExpectedT> relation,
-          PCollectionView<ActualT> actual, PCollectionView<ExpectedT> expected) {
-        this.relation = relation;
-        this.actual = actual;
-        this.expected = expected;
-      }
-
-      @Override
-      public void processElement(ProcessContext c) {
-        try {
-          ActualT actualContents = c.sideInput(actual);
-          ExpectedT expectedContents = c.sideInput(expected);
-          relation.assertFor(expectedContents).apply(actualContents);
-          success.addValue(1);
-        } catch (Throwable t) {
-          LOG.error("PAssert failed expectations.", t);
-          failure.addValue(1);
-          // TODO: allow for metrics to propagate on failure when running a streaming pipeline
-          if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
-            throw t;
-          }
-        }
-      }
-    }
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * A {@link SerializableFunction} that verifies that an actual value is equal to an
-   * expected value.
+   * A {@link SerializableFunction} that verifies that an actual value is equal to an expected
+   * value.
    */
   private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> {
     private T expected;
@@ -738,8 +724,8 @@ public class PAssert {
   }
 
   /**
-   * A {@link SerializableFunction} that verifies that an actual value is not equal to an
-   * expected value.
+   * A {@link SerializableFunction} that verifies that an actual value is not equal to an expected
+   * value.
    */
   private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> {
     private T expected;
@@ -756,8 +742,8 @@ public class PAssert {
   }
 
   /**
-   * A {@link SerializableFunction} that verifies that an {@code Iterable} contains
-   * expected items in any order.
+   * A {@link SerializableFunction} that verifies that an {@code Iterable} contains expected items
+   * in any order.
    */
   private static class AssertContainsInAnyOrder<T>
       implements SerializableFunction<Iterable<T>, Void> {
@@ -787,10 +773,9 @@ public class PAssert {
   ////////////////////////////////////////////////////////////
 
   /**
-   * A binary predicate between types {@code Actual} and {@code Expected}.
-   * Implemented as a method {@code assertFor(Expected)} which returns
-   * a {@code SerializableFunction<Actual, Void>}
-   * that should verify the assertion..
+   * A binary predicate between types {@code Actual} and {@code Expected}. Implemented as a method
+   * {@code assertFor(Expected)} which returns a {@code SerializableFunction<Actual, Void>} that
+   * should verify the assertion..
    */
   private static interface AssertRelation<ActualT, ExpectedT> extends Serializable {
     public SerializableFunction<ActualT, Void> assertFor(ExpectedT input);
@@ -799,8 +784,7 @@ public class PAssert {
   /**
    * An {@link AssertRelation} implementing the binary predicate that two objects are equal.
    */
-  private static class AssertIsEqualToRelation<T>
-      implements AssertRelation<T, T> {
+  private static class AssertIsEqualToRelation<T> implements AssertRelation<T, T> {
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
       return new AssertIsEqualTo<T>(expected);
@@ -810,8 +794,7 @@ public class PAssert {
   /**
    * An {@link AssertRelation} implementing the binary predicate that two objects are not equal.
    */
-  private static class AssertNotEqualToRelation<T>
-      implements AssertRelation<T, T> {
+  private static class AssertNotEqualToRelation<T> implements AssertRelation<T, T> {
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
       return new AssertNotEqualTo<T>(expected);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index f540948..fdc8719 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.sdk.testing;
 
-import static org.apache.beam.sdk.testing.SerializableMatchers.anything;
-import static org.apache.beam.sdk.testing.SerializableMatchers.not;
-
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -151,30 +148,6 @@ public class PAssertTest implements Serializable {
   }
 
   /**
-   * Basic test of succeeding {@link PAssert} using a {@link SerializableMatcher}.
-   */
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBasicMatcherSuccess() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-    PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
-    PAssert.that(pcollection).containsInAnyOrder(anything());
-    pipeline.run();
-  }
-
-  /**
-   * Basic test of failing {@link PAssert} using a {@link SerializableMatcher}.
-   */
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBasicMatcherFailure() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-    PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
-    PAssert.that(pcollection).containsInAnyOrder(not(anything()));
-    runExpectingAssertionFailure(pipeline);
-  }
-
-  /**
    * Test that we throw an error at pipeline construction time when the user mistakenly uses
    * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}.
    */



[2/4] incubator-beam git commit: Fix TriggerExampleTest

Posted by ke...@apache.org.
Fix TriggerExampleTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77aa0938
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77aa0938
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77aa0938

Branch: refs/heads/master
Commit: 77aa0938f75f9f3d18a4fa79a5ffe6159167f4d5
Parents: 810ffeb
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 8 16:22:34 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 9 14:41:09 2016 -0700

----------------------------------------------------------------------
 .../examples/cookbook/TriggerExampleTest.java   | 61 +++++++++++++-------
 1 file changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77aa0938/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index fe75d14..cddce7f 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -34,6 +34,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -44,7 +46,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Unit Tests for {@link TriggerExample}.
@@ -70,21 +74,27 @@ public class TriggerExampleTest {
           + "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0"
           + ",,,,,0,,,,,0", new Instant(1)));
 
-  private static final TableRow OUT_ROW_1 = new TableRow()
-      .set("trigger_type", "default")
-      .set("freeway", "5").set("total_flow", 30)
-      .set("number_of_records", 1)
-      .set("isFirst", true).set("isLast", true)
-      .set("timing", "ON_TIME")
-      .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
-
-  private static final TableRow OUT_ROW_2 = new TableRow()
-      .set("trigger_type", "default")
-      .set("freeway", "110").set("total_flow", 90)
-      .set("number_of_records", 2)
-      .set("isFirst", true).set("isLast", true)
-      .set("timing", "ON_TIME")
-      .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");
+  private static final TableRow OUT_ROW_1 =
+      new TableRow()
+          .set("trigger_type", "default")
+          .set("freeway", "5")
+          .set("total_flow", 30)
+          .set("number_of_records", 1)
+          .set("isFirst", true)
+          .set("isLast", true)
+          .set("timing", "ON_TIME")
+          .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
+
+  private static final TableRow OUT_ROW_2 =
+      new TableRow()
+          .set("trigger_type", "default")
+          .set("freeway", "110")
+          .set("total_flow", 90)
+          .set("number_of_records", 2)
+          .set("isFirst", true)
+          .set("isLast", true)
+          .set("timing", "ON_TIME")
+          .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");
 
   @Test
   public void testExtractTotalFlow() throws Exception {
@@ -112,15 +122,26 @@ public class TriggerExampleTest {
         .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1))))
         .apply(new TotalFlow("default"));
 
-    PCollection<TableRow> results =  totalFlow.apply(ParDo.of(new FormatResults()));
+    PCollection<String> results =  totalFlow.apply(ParDo.of(new FormatResults()));
 
-
-    PAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2);
+    PAssert.that(results)
+        .containsInAnyOrder(canonicalFormat(OUT_ROW_1), canonicalFormat(OUT_ROW_2));
     pipeline.run();
 
   }
 
-  static class FormatResults extends DoFn<TableRow, TableRow> {
+  // Sort the fields and toString() the values, since TableRow has a bit of a dynamically
+  // typed API and equals()/hashCode() are not appropriate for matching in tests
+  static String canonicalFormat(TableRow row) {
+    List<String> entries = Lists.newArrayListWithCapacity(row.size());
+    for (Map.Entry<String, Object> entry : row.entrySet()) {
+      entries.add(entry.getKey() + ":" + entry.getValue());
+    }
+    Collections.sort(entries);
+    return Joiner.on(",").join(entries);
+  }
+
+  static class FormatResults extends DoFn<TableRow, String> {
     @Override
     public void processElement(ProcessContext c) throws Exception {
       TableRow element = c.element();
@@ -133,7 +154,7 @@ public class TriggerExampleTest {
           .set("isLast", element.get("isLast"))
           .set("timing", element.get("timing"))
           .set("window", element.get("window"));
-      c.output(row);
+      c.output(canonicalFormat(row));
     }
   }
 }