You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/14 15:52:24 UTC

[1/2] incubator-beam git commit: Revert GBK-based PAssert

Repository: incubator-beam
Updated Branches:
  refs/heads/master f9a9214dd -> c8ad2e7dd


Revert GBK-based PAssert

This changed neglected the use of counters by the Dataflow runner,
which is used to prevent tests for spuriously passing when
a PCollection is empty.

Obvious fixes for that revealed probable bugs in the in-process
and Spark runner, as well as tests that happen to work with
PAssert but are actually unsupported.

A proper long-term fix is underway to address all of the above.
In the meantime, this commit rolls back the changes to PAssert.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045b568f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045b568f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045b568f

Branch: refs/heads/master
Commit: 045b568f6be4b7b010d4fd4cfdd1536db943ce54
Parents: f9a9214
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 08:05:04 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 08:05:41 2016 -0700

----------------------------------------------------------------------
 .../testing/TestDataflowPipelineRunner.java     |   3 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 779 +++++++++----------
 .../apache/beam/sdk/testing/PAssertTest.java    |  27 +
 3 files changed, 396 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
index c940e9a..3e8d903 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
@@ -166,8 +166,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
     if (transform instanceof PAssert.OneSideInputAssert
-        || transform instanceof PAssert.GroupThenAssert
-        || transform instanceof PAssert.GroupThenAssertForSingleton) {
+        || transform instanceof PAssert.TwoSideInputAssert) {
       expectedNumberOfAssertions += 1;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 62d3599..c2cd598 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -34,14 +34,11 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -51,27 +48,32 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
 /**
- * An assertion on the contents of a {@link PCollection} incorporated into the pipeline. Such an
- * assertion can be checked no matter what kind of {@link PipelineRunner} is used.
+ * An assertion on the contents of a {@link PCollection}
+ * incorporated into the pipeline.  Such an assertion
+ * can be checked no matter what kind of {@link PipelineRunner} is
+ * used.
  *
- * <p>Note that the {@code PAssert} call must precede the call to {@link Pipeline#run}.
+ * <p>Note that the {@code PAssert} call must precede the call
+ * to {@link Pipeline#run}.
  *
- * <p>Examples of use: <pre>{@code
+ * <p>Examples of use:
+ * <pre>{@code
  * Pipeline p = TestPipeline.create();
  * ...
  * PCollection<String> output =
@@ -105,84 +107,30 @@ public class PAssert {
   private PAssert() {}
 
   /**
-   * Builder interface for assertions applicable to iterables and PCollection contents.
-   */
-  public interface IterableAssert<T> {
-
-    /**
-     * Asserts that the iterable in question contains the provided elements.
-     *
-     * @return the same {@link IterableAssert} builder for further assertions
-     */
-    IterableAssert<T> containsInAnyOrder(T... expectedElements);
-
-    /**
-     * Asserts that the iterable in question contains the provided elements.
-     *
-     * @return the same {@link IterableAssert} builder for further assertions
-     */
-    IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements);
-
-    /**
-     * Asserts that the iterable in question is empty.
-     *
-     * @return the same {@link IterableAssert} builder for further assertions
-     */
-    IterableAssert<T> empty();
-
-    /**
-     * Applies the provided checking function (presumably containing assertions) to the
-     * iterable in question.
-     *
-     * @return the same {@link IterableAssert} builder for further assertions
-     */
-    IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn);
-  }
-
-  /**
-   * Builder interface for assertions applicable to a single value.
-   */
-  public interface SingletonAssert<T> {
-    /**
-     * Asserts that the value in question is equal to the provided value, according to
-     * {@link Object#equals}.
-     *
-     * @return the same {@link SingletonAssert} builder for further assertions
-     */
-    SingletonAssert<T> isEqualTo(T expected);
-
-    /**
-     * Asserts that the value in question is not equal to the provided value, according
-     * to {@link Object#equals}.
-     *
-     * @return the same {@link SingletonAssert} builder for further assertions
-     */
-    SingletonAssert<T> notEqualTo(T notExpected);
-
-    /**
-     * Applies the provided checking function (presumably containing assertions) to the
-     * value in question.
-     *
-     * @return the same {@link SingletonAssert} builder for further assertions
-     */
-    SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn);
-  }
-
-  /**
-   * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}.
+   * Constructs an {@link IterableAssert} for the elements of the provided
+   * {@link PCollection}.
    */
   public static <T> IterableAssert<T> that(PCollection<T> actual) {
-    return new PCollectionContentsAssert<>(actual);
+    return new IterableAssert<>(
+        new CreateActual<T, Iterable<T>>(actual, View.<T>asIterable()),
+         actual.getPipeline())
+         .setCoder(actual.getCoder());
   }
 
   /**
-   * Constructs an {@link IterableAssert} for the value of the provided {@link PCollection} which
-   * must contain a single {@code Iterable<T>} value.
+   * Constructs an {@link IterableAssert} for the value of the provided
+   * {@link PCollection} which must contain a single {@code Iterable<T>}
+   * value.
    */
-  public static <T> IterableAssert<T> thatSingletonIterable(
-      PCollection<? extends Iterable<T>> actual) {
+  public static <T> IterableAssert<T>
+      thatSingletonIterable(PCollection<? extends Iterable<T>> actual) {
 
+    List<? extends Coder<?>> maybeElementCoder = actual.getCoder().getCoderArguments();
+    Coder<T> tCoder;
     try {
+      @SuppressWarnings("unchecked")
+      Coder<T> tCoderTmp = (Coder<T>) Iterables.getOnlyElement(maybeElementCoder);
+      tCoder = tCoderTmp;
     } catch (NoSuchElementException | IllegalArgumentException exc) {
       throw new IllegalArgumentException(
           "PAssert.<T>thatSingletonIterable requires a PCollection<Iterable<T>>"
@@ -193,7 +141,19 @@ public class PAssert {
     @SuppressWarnings("unchecked") // Safe covariant cast
     PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual;
 
-    return new PCollectionSingletonIterableAssert<>(actualIterables);
+    return new IterableAssert<>(
+        new CreateActual<Iterable<T>, Iterable<T>>(
+            actualIterables, View.<Iterable<T>>asSingleton()),
+        actual.getPipeline())
+        .setCoder(tCoder);
+  }
+
+  /**
+   * Constructs an {@link IterableAssert} for the value of the provided
+   * {@code PCollectionView PCollectionView<Iterable<T>>}.
+   */
+  public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> actual) {
+    return new IterableAssert<>(new PreExisting<Iterable<T>>(actual), actual.getPipeline());
   }
 
   /**
@@ -201,95 +161,93 @@ public class PAssert {
    * {@code PCollection PCollection<T>}, which must be a singleton.
    */
   public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) {
-    return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder());
+    return new SingletonAssert<>(
+        new CreateActual<T, T>(actual, View.<T>asSingleton()), actual.getPipeline())
+        .setCoder(actual.getCoder());
   }
 
   /**
    * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}.
    *
-   * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
-   * {@code Coder<K, V>}.
+   * <p>Note that the actual value must be coded by a {@link KvCoder},
+   * not just any {@code Coder<K, V>}.
    */
-  public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap(
-      PCollection<KV<K, V>> actual) {
+  public static <K, V> SingletonAssert<Map<K, Iterable<V>>>
+      thatMultimap(PCollection<KV<K, V>> actual) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-    return new PCollectionViewAssert<>(
-        actual,
-        View.<K, V>asMultimap(),
-        MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
+
+    return new SingletonAssert<>(
+        new CreateActual<>(actual, View.<K, V>asMultimap()), actual.getPipeline())
+        .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
   }
 
   /**
-   * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which
-   * must have at most one value per key.
+   * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection},
+   * which must have at most one value per key.
    *
-   * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
-   * {@code Coder<K, V>}.
+   * <p>Note that the actual value must be coded by a {@link KvCoder},
+   * not just any {@code Coder<K, V>}.
    */
   public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-    return new PCollectionViewAssert<>(
-        actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
+
+    return new SingletonAssert<>(
+        new CreateActual<>(actual, View.<K, V>asMap()), actual.getPipeline())
+        .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
   }
 
   ////////////////////////////////////////////////////////////
 
   /**
-   * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require
-   * the runner to support side inputs.
+   * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable<T>}.
    */
-  private static class PCollectionContentsAssert<T> implements IterableAssert<T> {
-    private final PCollection<T> actual;
+  public static class IterableAssert<T> implements Serializable {
+    private final Pipeline pipeline;
+    private final PTransform<PBegin, PCollectionView<Iterable<T>>> createActual;
+    private Optional<Coder<T>> coder;
 
-    public PCollectionContentsAssert(PCollection<T> actual) {
-      this.actual = actual;
+    protected IterableAssert(
+        PTransform<PBegin, PCollectionView<Iterable<T>>> createActual, Pipeline pipeline) {
+      this.createActual = createActual;
+      this.pipeline = pipeline;
+      this.coder = Optional.absent();
     }
 
     /**
-     * Checks that the {@code Iterable} contains the expected elements, in any order.
+     * Sets the coder to use for elements of type {@code T}, as needed for internal purposes.
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    @Override
-    @SafeVarargs
-    public final PCollectionContentsAssert<T> containsInAnyOrder(T... expectedElements) {
-      return containsInAnyOrder(Arrays.asList(expectedElements));
+    public IterableAssert<T> setCoder(Coder<T> coderOrNull) {
+      this.coder = Optional.fromNullable(coderOrNull);
+      return this;
     }
 
     /**
-     * Checks that the {@code Iterable} contains the expected elements, in any order.
-     *
-     * <p>Returns this {@code IterableAssert}.
+     * Gets the coder, which may yet be absent.
      */
-    @Override
-    public PCollectionContentsAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
-    }
-
-    @Override
-    public PCollectionContentsAssert<T> empty() {
-      return containsInAnyOrder(Collections.<T>emptyList());
-    }
-
-    @Override
-    public PCollectionContentsAssert<T> satisfies(
-        SerializableFunction<Iterable<T>, Void> checkerFn) {
-      actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
-      return this;
+    public Coder<T> getCoder() {
+      if (coder.isPresent()) {
+        return coder.get();
+      } else {
+        throw new IllegalStateException(
+            "Attempting to access the coder of an IterableAssert"
+                + " that has not been set yet.");
+      }
     }
 
     /**
-     * Checks that the {@code Iterable} contains elements that match the provided matchers, in any
-     * order.
+     * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}.
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    @SafeVarargs
-    final PCollectionContentsAssert<T> containsInAnyOrder(
-        SerializableMatcher<? super T>... elementMatchers) {
-      return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
+    public IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn) {
+      pipeline.apply(
+          "PAssert$" + (assertCount++),
+          new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
+      return this;
     }
 
     /**
@@ -297,11 +255,17 @@ public class PAssert {
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    private PCollectionContentsAssert<T> satisfies(
-        AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) {
-      return satisfies(
-          new CheckRelationAgainstExpected<Iterable<T>>(
-              relation, expectedElements, IterableCoder.of(actual.getCoder())));
+    public IterableAssert<T> satisfies(
+        AssertRelation<Iterable<T>, Iterable<T>> relation,
+        final Iterable<T> expectedElements) {
+      pipeline.apply(
+          "PAssert$" + (assertCount++),
+          new TwoSideInputAssert<Iterable<T>, Iterable<T>>(
+              createActual,
+              new CreateExpected<T, Iterable<T>>(expectedElements, coder, View.<T>asIterable()),
+              relation));
+
+      return this;
     }
 
     /**
@@ -309,14 +273,15 @@ public class PAssert {
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    PCollectionContentsAssert<T> satisfies(
-        final SerializableMatcher<Iterable<? extends T>> matcher) {
+    IterableAssert<T> satisfies(final SerializableMatcher<Iterable<? extends T>> matcher) {
       // Safe covariant cast. Could be elided by changing a lot of this file to use
       // more flexible bounds.
       @SuppressWarnings({"rawtypes", "unchecked"})
       SerializableFunction<Iterable<T>, Void> checkerFn =
-          (SerializableFunction) new MatcherCheckerFn<>(matcher);
-      actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
+        (SerializableFunction) new MatcherCheckerFn<>(matcher);
+      pipeline.apply(
+          "PAssert$" + (assertCount++),
+          new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
       return this;
     }
 
@@ -335,9 +300,19 @@ public class PAssert {
     }
 
     /**
+     * Checks that the {@code Iterable} is empty.
+     *
+     * <p>Returns this {@code IterableAssert}.
+     */
+    public IterableAssert<T> empty() {
+      return satisfies(new AssertContainsInAnyOrderRelation<T>(), Collections.<T>emptyList());
+    }
+
+    /**
      * @throws UnsupportedOperationException always
-     * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If you meant
-     * to test object equality, use a variant of {@link #containsInAnyOrder} instead.
+     * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects.
+     *    If you meant to test object equality, use a variant of {@link #containsInAnyOrder}
+     *    instead.
      */
     @Deprecated
     @Override
@@ -356,129 +331,169 @@ public class PAssert {
       throw new UnsupportedOperationException(
           String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName()));
     }
-  }
 
-  /**
-   * An {@link IterableAssert} for an iterable that is the sole element of a {@link PCollection}.
-   * This does not require the runner to support side inputs.
-   */
-  private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> {
-    private final PCollection<Iterable<T>> actual;
-    private final Coder<T> elementCoder;
-
-    public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) {
-      this.actual = actual;
+    /**
+     * Checks that the {@code Iterable} contains the expected elements, in any
+     * order.
+     *
+     * <p>Returns this {@code IterableAssert}.
+     */
+    public IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
+      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
+    }
 
-      @SuppressWarnings("unchecked")
-      Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0);
-      this.elementCoder = typedCoder;
+    /**
+     * Checks that the {@code Iterable} contains the expected elements, in any
+     * order.
+     *
+     * <p>Returns this {@code IterableAssert}.
+     */
+    @SafeVarargs
+    public final IterableAssert<T> containsInAnyOrder(T... expectedElements) {
+      return satisfies(
+        new AssertContainsInAnyOrderRelation<T>(),
+        Arrays.asList(expectedElements));
     }
 
-    @Override
+    /**
+     * Checks that the {@code Iterable} contains elements that match the provided matchers,
+     * in any order.
+     *
+     * <p>Returns this {@code IterableAssert}.
+     */
     @SafeVarargs
-    public final PCollectionSingletonIterableAssert<T> containsInAnyOrder(T... expectedElements) {
-      return containsInAnyOrder(Arrays.asList(expectedElements));
+    final IterableAssert<T> containsInAnyOrder(
+        SerializableMatcher<? super T>... elementMatchers) {
+      return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
     }
+  }
 
-    @Override
-    public PCollectionSingletonIterableAssert<T> empty() {
-      return containsInAnyOrder(Collections.<T>emptyList());
+  /**
+   * An assertion about the single value of type {@code T}
+   * associated with a {@link PCollectionView}.
+   */
+  public static class SingletonAssert<T> implements Serializable {
+    private final Pipeline pipeline;
+    private final CreateActual<?, T> createActual;
+    private Optional<Coder<T>> coder;
+
+    protected SingletonAssert(
+        CreateActual<?, T> createActual, Pipeline pipeline) {
+      this.pipeline = pipeline;
+      this.createActual = createActual;
+      this.coder = Optional.absent();
     }
 
+    /**
+     * Always throws an {@link UnsupportedOperationException}: users are probably looking for
+     * {@link #isEqualTo}.
+     */
+    @Deprecated
     @Override
-    public PCollectionSingletonIterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
+    public boolean equals(Object o) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "tests for Java equality of the %s object, not the PCollection in question. "
+                  + "Call a test method, such as isEqualTo.",
+              getClass().getSimpleName()));
     }
 
+    /**
+     * @throws UnsupportedOperationException always.
+     * @deprecated {@link Object#hashCode()} is not supported on PAssert objects.
+     */
+    @Deprecated
     @Override
-    public PCollectionSingletonIterableAssert<T> satisfies(
-        SerializableFunction<Iterable<T>, Void> checkerFn) {
-      actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn));
-      return this;
+    public int hashCode() {
+      throw new UnsupportedOperationException(
+          String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
     }
 
-    private PCollectionSingletonIterableAssert<T> satisfies(
-        AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) {
-      return satisfies(
-          new CheckRelationAgainstExpected<Iterable<T>>(
-              relation, expectedElements, IterableCoder.of(elementCoder)));
+    /**
+     * Sets the coder to use for elements of type {@code T}, as needed
+     * for internal purposes.
+     *
+     * <p>Returns this {@code IterableAssert}.
+     */
+    public SingletonAssert<T> setCoder(Coder<T> coderOrNull) {
+      this.coder = Optional.fromNullable(coderOrNull);
+      return this;
     }
-  }
 
-  /**
-   * An assertion about the contents of a {@link PCollection} when it is viewed as a single value
-   * of type {@code ViewT}. This requires side input support from the runner.
-   */
-  private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> {
-    private final PCollection<ElemT> actual;
-    private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
-    private final Coder<ViewT> coder;
-
-    protected PCollectionViewAssert(
-        PCollection<ElemT> actual,
-        PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
-        Coder<ViewT> coder) {
-      this.actual = actual;
-      this.view = view;
-      this.coder = coder;
+    /**
+     * Gets the coder, which may yet be absent.
+     */
+    public Coder<T> getCoder() {
+      if (coder.isPresent()) {
+        return coder.get();
+      } else {
+        throw new IllegalStateException(
+            "Attempting to access the coder of an IterableAssert that has not been set yet.");
+      }
     }
 
-    @Override
-    public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
-      return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
+    /**
+     * Applies a {@link SerializableFunction} to check the value of this
+     * {@code SingletonAssert}'s view.
+     *
+     * <p>Returns this {@code SingletonAssert}.
+     */
+    public SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) {
+      pipeline.apply(
+          "PAssert$" + (assertCount++), new OneSideInputAssert<T>(createActual, checkerFn));
+      return this;
     }
 
-    @Override
-    public PCollectionViewAssert<ElemT, ViewT> notEqualTo(ViewT expectedValue) {
-      return satisfies(new AssertNotEqualToRelation<ViewT>(), expectedValue);
-    }
+    /**
+     * Applies an {@link AssertRelation} to check the provided relation against the
+     * value of this assert and the provided expected value.
+     *
+     * <p>Returns this {@code SingletonAssert}.
+     */
+    public SingletonAssert<T> satisfies(
+        AssertRelation<T, T> relation,
+        final T expectedValue) {
+      pipeline.apply(
+          "PAssert$" + (assertCount++),
+          new TwoSideInputAssert<T, T>(
+              createActual,
+              new CreateExpected<T, T>(Arrays.asList(expectedValue), coder, View.<T>asSingleton()),
+              relation));
 
-    @Override
-    public PCollectionViewAssert<ElemT, ViewT> satisfies(
-        SerializableFunction<ViewT, Void> checkerFn) {
-      actual
-          .getPipeline()
-          .apply(
-              "PAssert$" + (assertCount++),
-              new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn));
       return this;
     }
 
     /**
-     * Applies an {@link AssertRelation} to check the provided relation against the value of this
-     * assert and the provided expected value.
+     * Checks that the value of this {@code SingletonAssert}'s view is equal
+     * to the expected value.
      *
      * <p>Returns this {@code SingletonAssert}.
      */
-    private PCollectionViewAssert<ElemT, ViewT> satisfies(
-        AssertRelation<ViewT, ViewT> relation, final ViewT expectedValue) {
-      return satisfies(new CheckRelationAgainstExpected<ViewT>(relation, expectedValue, coder));
+    public SingletonAssert<T> isEqualTo(T expectedValue) {
+      return satisfies(new AssertIsEqualToRelation<T>(), expectedValue);
     }
 
     /**
-     * Always throws an {@link UnsupportedOperationException}: users are probably looking for
-     * {@link #isEqualTo}.
+     * Checks that the value of this {@code SingletonAssert}'s view is not equal
+     * to the expected value.
+     *
+     * <p>Returns this {@code SingletonAssert}.
      */
-    @Deprecated
-    @Override
-    public boolean equals(Object o) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "tests for Java equality of the %s object, not the PCollection in question. "
-                  + "Call a test method, such as isEqualTo.",
-              getClass().getSimpleName()));
+    public SingletonAssert<T> notEqualTo(T expectedValue) {
+      return satisfies(new AssertNotEqualToRelation<T>(), expectedValue);
     }
 
     /**
-     * @throws UnsupportedOperationException always.
-     * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects.
+     * Checks that the value of this {@code SingletonAssert}'s view is equal to
+     * the expected value.
+     *
+     * @deprecated replaced by {@link #isEqualTo}
      */
     @Deprecated
-    @Override
-    public int hashCode() {
-      throw new UnsupportedOperationException(
-          String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
+    public SingletonAssert<T> is(T expectedValue) {
+      return isEqualTo(expectedValue);
     }
+
   }
 
   ////////////////////////////////////////////////////////////////////////
@@ -489,13 +504,8 @@ public class PAssert {
     private final transient PCollection<T> actual;
     private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
 
-    public static <T, ActualT> CreateActual<T, ActualT> from(
-        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
-      return new CreateActual<>(actual, actualView);
-    }
-
-    private CreateActual(
-        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+    private CreateActual(PCollection<T> actual,
+        PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
       this.actual = actual;
       this.actualView = actualView;
     }
@@ -505,129 +515,73 @@ public class PAssert {
       final Coder<T> coder = actual.getCoder();
       return actual
           .apply(Window.<T>into(new GlobalWindows()))
-          .apply(
-              ParDo.of(
-                  new DoFn<T, T>() {
-                    @Override
-                    public void processElement(ProcessContext context) throws CoderException {
-                      context.output(CoderUtils.clone(coder, context.element()));
-                    }
-                  }))
+          .apply(ParDo.of(new DoFn<T, T>() {
+            @Override
+            public void processElement(ProcessContext context) throws CoderException {
+              context.output(CoderUtils.clone(coder, context.element()));
+            }
+          }))
           .apply(actualView);
     }
   }
 
-  /**
-   * A partially applied {@link AssertRelation}, where one value is provided along with a coder to
-   * serialize/deserialize them.
-   */
-  private static class CheckRelationAgainstExpected<T> implements SerializableFunction<T, Void> {
-    private final AssertRelation<T, T> relation;
-    private final byte[] encodedExpected;
-    private final Coder<T> coder;
+  private static class CreateExpected<T, ExpectedT>
+      extends PTransform<PBegin, PCollectionView<ExpectedT>> {
 
-    public CheckRelationAgainstExpected(AssertRelation<T, T> relation, T expected, Coder<T> coder) {
-      this.relation = relation;
-      this.coder = coder;
+    private final Iterable<T> elements;
+    private final Optional<Coder<T>> coder;
+    private final transient PTransform<PCollection<T>, PCollectionView<ExpectedT>> view;
 
-      try {
-        this.encodedExpected = CoderUtils.encodeToByteArray(coder, expected);
-      } catch (IOException coderException) {
-        throw new RuntimeException(coderException);
-      }
+    private CreateExpected(Iterable<T> elements, Optional<Coder<T>> coder,
+        PTransform<PCollection<T>, PCollectionView<ExpectedT>> view) {
+      this.elements = elements;
+      this.coder = coder;
+      this.view = view;
     }
 
     @Override
-    public Void apply(T actual) {
-      try {
-        T expected = CoderUtils.decodeFromByteArray(coder, encodedExpected);
-        return relation.assertFor(expected).apply(actual);
-      } catch (IOException coderException) {
-        throw new RuntimeException(coderException);
+    public PCollectionView<ExpectedT> apply(PBegin input) {
+      Create.Values<T> createTransform = Create.<T>of(elements);
+      if (coder.isPresent()) {
+        createTransform = createTransform.withCoder(coder.get());
       }
+      return input.apply(createTransform).apply(view);
     }
   }
 
-  /**
-   * A transform that gathers the contents of a {@link PCollection} into a single main input
-   * iterable in the global window. This requires a runner to support {@link GroupByKey} in the
-   * global window, but not side inputs or other windowing or triggers.
-   */
-  private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>>
-      implements Serializable {
-
-    public GroupGlobally() {}
-
-    @Override
-    public PCollection<Iterable<T>> apply(PCollection<T> input) {
-      return input
-          .apply("GloballyWindow", Window.<T>into(new GlobalWindows()))
-          .apply("DummyKey", WithKeys.<Integer, T>of(0))
-          .apply("GroupByKey", GroupByKey.<Integer, T>create())
-          .apply("GetOnlyValue", Values.<Iterable<T>>create());
-    }
-  }
-
-  /**
-   * A transform that applies an assertion-checking function over iterables of {@code ActualT} to
-   * the entirety of the contents of its input.
-   */
-  public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone>
-      implements Serializable {
-    private final SerializableFunction<Iterable<T>, Void> checkerFn;
-
-    private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) {
-      this.checkerFn = checkerFn;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      input
-          .apply("GroupGlobally", new GroupGlobally<T>())
-          .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
+  private static class PreExisting<T> extends PTransform<PBegin, PCollectionView<T>> {
 
-      return PDone.in(input.getPipeline());
-    }
-  }
+    private final PCollectionView<T> view;
 
-  /**
-   * A transform that applies an assertion-checking function to a single iterable contained as the
-   * sole element of a {@link PCollection}.
-   */
-  public static class GroupThenAssertForSingleton<T>
-      extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable {
-    private final SerializableFunction<Iterable<T>, Void> checkerFn;
-
-    private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) {
-      this.checkerFn = checkerFn;
+    private PreExisting(PCollectionView<T> view) {
+      this.view = view;
     }
 
     @Override
-    public PDone apply(PCollection<Iterable<T>> input) {
-      input
-          .apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
-          .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
-
-      return PDone.in(input.getPipeline());
+    public PCollectionView<T> apply(PBegin input) {
+      return view;
     }
   }
 
   /**
-   * An assertion checker that takes a single {@link PCollectionView
-   * PCollectionView&lt;ActualT&gt;} and an assertion over {@code ActualT}, and checks it within a
-   * Beam pipeline.
+   * An assertion checker that takes a single
+   * {@link PCollectionView PCollectionView&lt;ActualT&gt;}
+   * and an assertion over {@code ActualT}, and checks it within a dataflow
+   * pipeline.
    *
-   * <p>Note that the entire assertion must be serializable.
+   * <p>Note that the entire assertion must be serializable. If
+   * you need to make assertions involving multiple inputs
+   * that are each not serializable, use TwoSideInputAssert.
    *
-   * <p>This is generally useful for assertion functions that are serializable but whose underlying
-   * data may not have a coder.
+   * <p>This is generally useful for assertion functions that
+   * are serializable but whose underlying data may not have a coder.
    */
-  public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone>
-      implements Serializable {
+  public static class OneSideInputAssert<ActualT>
+      extends PTransform<PBegin, PDone> implements Serializable {
     private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
     private final SerializableFunction<ActualT, Void> checkerFn;
 
-    private OneSideInputAssert(
+    public OneSideInputAssert(
         PTransform<PBegin, PCollectionView<ActualT>> createActual,
         SerializableFunction<ActualT, Void> checkerFn) {
       this.createActual = createActual;
@@ -640,23 +594,21 @@ public class PAssert {
 
       input
           .apply(Create.of(0).withCoder(VarIntCoder.of()))
-          .apply(
-              ParDo.named("RunChecks")
-                  .withSideInputs(actual)
-                  .of(new SideInputCheckerDoFn<>(checkerFn, actual)));
+          .apply(ParDo.named("RunChecks").withSideInputs(actual)
+              .of(new CheckerDoFn<>(checkerFn, actual)));
 
       return PDone.in(input.getPipeline());
     }
   }
 
   /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
-   * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
    * null values.
    */
-  private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
+  private static class CheckerDoFn<ActualT> extends DoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -664,8 +616,9 @@ public class PAssert {
         createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
     private final PCollectionView<ActualT> actual;
 
-    private SideInputCheckerDoFn(
-        SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) {
+    private CheckerDoFn(
+        SerializableFunction<ActualT, Void> checkerFn,
+        PCollectionView<ActualT> actual) {
       this.checkerFn = checkerFn;
       this.actual = actual;
     }
@@ -674,9 +627,12 @@ public class PAssert {
     public void processElement(ProcessContext c) {
       try {
         ActualT actualContents = c.sideInput(actual);
-        doChecks(actualContents, checkerFn, success, failure);
+        checkerFn.apply(actualContents);
+        success.addValue(1);
       } catch (Throwable t) {
-        // Suppress exception in streaming
+        LOG.error("PAssert failed expectations.", t);
+        failure.addValue(1);
+        // TODO: allow for metrics to propagate on failure when running a streaming pipeline
         if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
           throw t;
         }
@@ -685,89 +641,87 @@ public class PAssert {
   }
 
   /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
-   * the single iterable element of the input {@link PCollection} and adjusts counters and
-   * thrown exceptions for use in testing.
+   * An assertion checker that takes a {@link PCollectionView PCollectionView&lt;ActualT&gt;},
+   * a {@link PCollectionView PCollectionView&lt;ExpectedT&gt;}, a relation
+   * over {@code A} and {@code B}, and checks that the relation holds
+   * within a dataflow pipeline.
    *
-   * <p>The singleton property is presumed, not enforced.
+   * <p>This is useful when either/both of {@code A} and {@code B}
+   * are not serializable, but have coders (provided
+   * by the underlying {@link PCollection}s).
    */
-  private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
-    private final SerializableFunction<ActualT, Void> checkerFn;
-    private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
-    private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+  public static class TwoSideInputAssert<ActualT, ExpectedT>
+      extends PTransform<PBegin, PDone> implements Serializable {
 
-    private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
-      this.checkerFn = checkerFn;
+    private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
+    private final transient PTransform<PBegin, PCollectionView<ExpectedT>> createExpected;
+    private final AssertRelation<ActualT, ExpectedT> relation;
+
+    protected TwoSideInputAssert(
+        PTransform<PBegin, PCollectionView<ActualT>> createActual,
+        PTransform<PBegin, PCollectionView<ExpectedT>> createExpected,
+        AssertRelation<ActualT, ExpectedT> relation) {
+      this.createActual = createActual;
+      this.createExpected = createExpected;
+      this.relation = relation;
     }
 
     @Override
-    public void processElement(ProcessContext c) {
-      try {
-        doChecks(c.element(), checkerFn, success, failure);
-      } catch (Throwable t) {
-        // Suppress exception in streaming
-        if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
-          throw t;
-        }
-      }
-    }
-  }
+    public PDone apply(PBegin input) {
+      final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
+      final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected);
 
-  /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
-   * the single item contained within the single iterable on input and
-   * adjusts counters and thrown exceptions for use in testing.
-   *
-   * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
-   * each input element must be a singleton iterable, or this will fail.
-   */
-  private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
-    private final SerializableFunction<ActualT, Void> checkerFn;
-    private final Aggregator<Integer, Integer> success =
-        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
-    private final Aggregator<Integer, Integer> failure =
-        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+      input
+          .apply(Create.of(0).withCoder(VarIntCoder.of()))
+          .apply("RunChecks", ParDo.withSideInputs(actual, expected)
+              .of(new CheckerDoFn<>(relation, actual, expected)));
 
-    private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
-      this.checkerFn = checkerFn;
+      return PDone.in(input.getPipeline());
     }
 
-    @Override
-    public void processElement(ProcessContext c) {
-      try {
-        ActualT actualContents = Iterables.getOnlyElement(c.element());
-        doChecks(actualContents, checkerFn, success, failure);
-      } catch (Throwable t) {
-        // Suppress exception in streaming
-        if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
-          throw t;
-        }
+    /**
+     * Input is ignored, but is {@link Integer} for runners that do not support null values.
+     */
+    private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Integer, Void> {
+      private final Aggregator<Integer, Integer> success =
+          createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+      private final Aggregator<Integer, Integer> failure =
+          createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+      private final AssertRelation<ActualT, ExpectedT> relation;
+      private final PCollectionView<ActualT> actual;
+      private final PCollectionView<ExpectedT> expected;
+
+      private CheckerDoFn(AssertRelation<ActualT, ExpectedT> relation,
+          PCollectionView<ActualT> actual, PCollectionView<ExpectedT> expected) {
+        this.relation = relation;
+        this.actual = actual;
+        this.expected = expected;
       }
-    }
-  }
 
-  private static <ActualT> void doChecks(
-      ActualT actualContents,
-      SerializableFunction<ActualT, Void> checkerFn,
-      Aggregator<Integer, Integer> successAggregator,
-      Aggregator<Integer, Integer> failureAggregator) {
-    try {
-      checkerFn.apply(actualContents);
-      successAggregator.addValue(1);
-    } catch (Throwable t) {
-      LOG.error("PAssert failed expectations.", t);
-      failureAggregator.addValue(1);
-      throw t;
+      @Override
+      public void processElement(ProcessContext c) {
+        try {
+          ActualT actualContents = c.sideInput(actual);
+          ExpectedT expectedContents = c.sideInput(expected);
+          relation.assertFor(expectedContents).apply(actualContents);
+          success.addValue(1);
+        } catch (Throwable t) {
+          LOG.error("PAssert failed expectations.", t);
+          failure.addValue(1);
+          // TODO: allow for metrics to propagate on failure when running a streaming pipeline
+          if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
+            throw t;
+          }
+        }
+      }
     }
   }
 
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * A {@link SerializableFunction} that verifies that an actual value is equal to an expected
-   * value.
+   * A {@link SerializableFunction} that verifies that an actual value is equal to an
+   * expected value.
    */
   private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> {
     private T expected;
@@ -784,8 +738,8 @@ public class PAssert {
   }
 
   /**
-   * A {@link SerializableFunction} that verifies that an actual value is not equal to an expected
-   * value.
+   * A {@link SerializableFunction} that verifies that an actual value is not equal to an
+   * expected value.
    */
   private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> {
     private T expected;
@@ -802,8 +756,8 @@ public class PAssert {
   }
 
   /**
-   * A {@link SerializableFunction} that verifies that an {@code Iterable} contains expected items
-   * in any order.
+   * A {@link SerializableFunction} that verifies that an {@code Iterable} contains
+   * expected items in any order.
    */
   private static class AssertContainsInAnyOrder<T>
       implements SerializableFunction<Iterable<T>, Void> {
@@ -833,9 +787,10 @@ public class PAssert {
   ////////////////////////////////////////////////////////////
 
   /**
-   * A binary predicate between types {@code Actual} and {@code Expected}. Implemented as a method
-   * {@code assertFor(Expected)} which returns a {@code SerializableFunction<Actual, Void>} that
-   * should verify the assertion..
+   * A binary predicate between types {@code Actual} and {@code Expected}.
+   * Implemented as a method {@code assertFor(Expected)} which returns
+   * a {@code SerializableFunction<Actual, Void>}
+   * that should verify the assertion..
    */
   private static interface AssertRelation<ActualT, ExpectedT> extends Serializable {
     public SerializableFunction<ActualT, Void> assertFor(ExpectedT input);
@@ -844,7 +799,8 @@ public class PAssert {
   /**
    * An {@link AssertRelation} implementing the binary predicate that two objects are equal.
    */
-  private static class AssertIsEqualToRelation<T> implements AssertRelation<T, T> {
+  private static class AssertIsEqualToRelation<T>
+      implements AssertRelation<T, T> {
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
       return new AssertIsEqualTo<T>(expected);
@@ -854,7 +810,8 @@ public class PAssert {
   /**
    * An {@link AssertRelation} implementing the binary predicate that two objects are not equal.
    */
-  private static class AssertNotEqualToRelation<T> implements AssertRelation<T, T> {
+  private static class AssertNotEqualToRelation<T>
+      implements AssertRelation<T, T> {
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
       return new AssertNotEqualTo<T>(expected);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index fdc8719..f540948 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static org.apache.beam.sdk.testing.SerializableMatchers.anything;
+import static org.apache.beam.sdk.testing.SerializableMatchers.not;
+
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -148,6 +151,30 @@ public class PAssertTest implements Serializable {
   }
 
   /**
+   * Basic test of succeeding {@link PAssert} using a {@link SerializableMatcher}.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBasicMatcherSuccess() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
+    PAssert.that(pcollection).containsInAnyOrder(anything());
+    pipeline.run();
+  }
+
+  /**
+   * Basic test of failing {@link PAssert} using a {@link SerializableMatcher}.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBasicMatcherFailure() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
+    PAssert.that(pcollection).containsInAnyOrder(not(anything()));
+    runExpectingAssertionFailure(pipeline);
+  }
+
+  /**
    * Test that we throw an error at pipeline construction time when the user mistakenly uses
    * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}.
    */


[2/2] incubator-beam git commit: This closes #457

Posted by ke...@apache.org.
This closes #457


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8ad2e7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8ad2e7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8ad2e7d

Branch: refs/heads/master
Commit: c8ad2e7dd5443ba40d126dca4cd3cb29b33103cf
Parents: f9a9214 045b568
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 08:52:03 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 08:52:03 2016 -0700

----------------------------------------------------------------------
 .../testing/TestDataflowPipelineRunner.java     |   3 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 779 +++++++++----------
 .../apache/beam/sdk/testing/PAssertTest.java    |  27 +
 3 files changed, 396 insertions(+), 413 deletions(-)
----------------------------------------------------------------------