You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/14 15:52:24 UTC
[1/2] incubator-beam git commit: Revert GBK-based PAssert
Repository: incubator-beam
Updated Branches:
refs/heads/master f9a9214dd -> c8ad2e7dd
Revert GBK-based PAssert
This changed neglected the use of counters by the Dataflow runner,
which is used to prevent tests for spuriously passing when
a PCollection is empty.
Obvious fixes for that revealed probable bugs in the in-process
and Spark runner, as well as tests that happen to work with
PAssert but are actually unsupported.
A proper long-term fix is underway to address all of the above.
In the meantime, this commit rolls back the changes to PAssert.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045b568f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045b568f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045b568f
Branch: refs/heads/master
Commit: 045b568f6be4b7b010d4fd4cfdd1536db943ce54
Parents: f9a9214
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 08:05:04 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 08:05:41 2016 -0700
----------------------------------------------------------------------
.../testing/TestDataflowPipelineRunner.java | 3 +-
.../org/apache/beam/sdk/testing/PAssert.java | 779 +++++++++----------
.../apache/beam/sdk/testing/PAssertTest.java | 27 +
3 files changed, 396 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
index c940e9a..3e8d903 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
@@ -166,8 +166,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
if (transform instanceof PAssert.OneSideInputAssert
- || transform instanceof PAssert.GroupThenAssert
- || transform instanceof PAssert.GroupThenAssertForSingleton) {
+ || transform instanceof PAssert.TwoSideInputAssert) {
expectedNumberOfAssertions += 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 62d3599..c2cd598 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -34,14 +34,11 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
@@ -51,27 +48,32 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
+import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
/**
- * An assertion on the contents of a {@link PCollection} incorporated into the pipeline. Such an
- * assertion can be checked no matter what kind of {@link PipelineRunner} is used.
+ * An assertion on the contents of a {@link PCollection}
+ * incorporated into the pipeline. Such an assertion
+ * can be checked no matter what kind of {@link PipelineRunner} is
+ * used.
*
- * <p>Note that the {@code PAssert} call must precede the call to {@link Pipeline#run}.
+ * <p>Note that the {@code PAssert} call must precede the call
+ * to {@link Pipeline#run}.
*
- * <p>Examples of use: <pre>{@code
+ * <p>Examples of use:
+ * <pre>{@code
* Pipeline p = TestPipeline.create();
* ...
* PCollection<String> output =
@@ -105,84 +107,30 @@ public class PAssert {
private PAssert() {}
/**
- * Builder interface for assertions applicable to iterables and PCollection contents.
- */
- public interface IterableAssert<T> {
-
- /**
- * Asserts that the iterable in question contains the provided elements.
- *
- * @return the same {@link IterableAssert} builder for further assertions
- */
- IterableAssert<T> containsInAnyOrder(T... expectedElements);
-
- /**
- * Asserts that the iterable in question contains the provided elements.
- *
- * @return the same {@link IterableAssert} builder for further assertions
- */
- IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements);
-
- /**
- * Asserts that the iterable in question is empty.
- *
- * @return the same {@link IterableAssert} builder for further assertions
- */
- IterableAssert<T> empty();
-
- /**
- * Applies the provided checking function (presumably containing assertions) to the
- * iterable in question.
- *
- * @return the same {@link IterableAssert} builder for further assertions
- */
- IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn);
- }
-
- /**
- * Builder interface for assertions applicable to a single value.
- */
- public interface SingletonAssert<T> {
- /**
- * Asserts that the value in question is equal to the provided value, according to
- * {@link Object#equals}.
- *
- * @return the same {@link SingletonAssert} builder for further assertions
- */
- SingletonAssert<T> isEqualTo(T expected);
-
- /**
- * Asserts that the value in question is not equal to the provided value, according
- * to {@link Object#equals}.
- *
- * @return the same {@link SingletonAssert} builder for further assertions
- */
- SingletonAssert<T> notEqualTo(T notExpected);
-
- /**
- * Applies the provided checking function (presumably containing assertions) to the
- * value in question.
- *
- * @return the same {@link SingletonAssert} builder for further assertions
- */
- SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn);
- }
-
- /**
- * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}.
+ * Constructs an {@link IterableAssert} for the elements of the provided
+ * {@link PCollection}.
*/
public static <T> IterableAssert<T> that(PCollection<T> actual) {
- return new PCollectionContentsAssert<>(actual);
+ return new IterableAssert<>(
+ new CreateActual<T, Iterable<T>>(actual, View.<T>asIterable()),
+ actual.getPipeline())
+ .setCoder(actual.getCoder());
}
/**
- * Constructs an {@link IterableAssert} for the value of the provided {@link PCollection} which
- * must contain a single {@code Iterable<T>} value.
+ * Constructs an {@link IterableAssert} for the value of the provided
+ * {@link PCollection} which must contain a single {@code Iterable<T>}
+ * value.
*/
- public static <T> IterableAssert<T> thatSingletonIterable(
- PCollection<? extends Iterable<T>> actual) {
+ public static <T> IterableAssert<T>
+ thatSingletonIterable(PCollection<? extends Iterable<T>> actual) {
+ List<? extends Coder<?>> maybeElementCoder = actual.getCoder().getCoderArguments();
+ Coder<T> tCoder;
try {
+ @SuppressWarnings("unchecked")
+ Coder<T> tCoderTmp = (Coder<T>) Iterables.getOnlyElement(maybeElementCoder);
+ tCoder = tCoderTmp;
} catch (NoSuchElementException | IllegalArgumentException exc) {
throw new IllegalArgumentException(
"PAssert.<T>thatSingletonIterable requires a PCollection<Iterable<T>>"
@@ -193,7 +141,19 @@ public class PAssert {
@SuppressWarnings("unchecked") // Safe covariant cast
PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual;
- return new PCollectionSingletonIterableAssert<>(actualIterables);
+ return new IterableAssert<>(
+ new CreateActual<Iterable<T>, Iterable<T>>(
+ actualIterables, View.<Iterable<T>>asSingleton()),
+ actual.getPipeline())
+ .setCoder(tCoder);
+ }
+
+ /**
+ * Constructs an {@link IterableAssert} for the value of the provided
+ * {@code PCollectionView PCollectionView<Iterable<T>>}.
+ */
+ public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> actual) {
+ return new IterableAssert<>(new PreExisting<Iterable<T>>(actual), actual.getPipeline());
}
/**
@@ -201,95 +161,93 @@ public class PAssert {
* {@code PCollection PCollection<T>}, which must be a singleton.
*/
public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) {
- return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder());
+ return new SingletonAssert<>(
+ new CreateActual<T, T>(actual, View.<T>asSingleton()), actual.getPipeline())
+ .setCoder(actual.getCoder());
}
/**
* Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}.
*
- * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
- * {@code Coder<K, V>}.
+ * <p>Note that the actual value must be coded by a {@link KvCoder},
+ * not just any {@code Coder<K, V>}.
*/
- public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap(
- PCollection<KV<K, V>> actual) {
+ public static <K, V> SingletonAssert<Map<K, Iterable<V>>>
+ thatMultimap(PCollection<KV<K, V>> actual) {
@SuppressWarnings("unchecked")
KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
- return new PCollectionViewAssert<>(
- actual,
- View.<K, V>asMultimap(),
- MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
+
+ return new SingletonAssert<>(
+ new CreateActual<>(actual, View.<K, V>asMultimap()), actual.getPipeline())
+ .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
}
/**
- * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which
- * must have at most one value per key.
+ * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection},
+ * which must have at most one value per key.
*
- * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
- * {@code Coder<K, V>}.
+ * <p>Note that the actual value must be coded by a {@link KvCoder},
+ * not just any {@code Coder<K, V>}.
*/
public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) {
@SuppressWarnings("unchecked")
KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
- return new PCollectionViewAssert<>(
- actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
+
+ return new SingletonAssert<>(
+ new CreateActual<>(actual, View.<K, V>asMap()), actual.getPipeline())
+ .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
}
////////////////////////////////////////////////////////////
/**
- * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require
- * the runner to support side inputs.
+ * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable<T>}.
*/
- private static class PCollectionContentsAssert<T> implements IterableAssert<T> {
- private final PCollection<T> actual;
+ public static class IterableAssert<T> implements Serializable {
+ private final Pipeline pipeline;
+ private final PTransform<PBegin, PCollectionView<Iterable<T>>> createActual;
+ private Optional<Coder<T>> coder;
- public PCollectionContentsAssert(PCollection<T> actual) {
- this.actual = actual;
+ protected IterableAssert(
+ PTransform<PBegin, PCollectionView<Iterable<T>>> createActual, Pipeline pipeline) {
+ this.createActual = createActual;
+ this.pipeline = pipeline;
+ this.coder = Optional.absent();
}
/**
- * Checks that the {@code Iterable} contains the expected elements, in any order.
+ * Sets the coder to use for elements of type {@code T}, as needed for internal purposes.
*
* <p>Returns this {@code IterableAssert}.
*/
- @Override
- @SafeVarargs
- public final PCollectionContentsAssert<T> containsInAnyOrder(T... expectedElements) {
- return containsInAnyOrder(Arrays.asList(expectedElements));
+ public IterableAssert<T> setCoder(Coder<T> coderOrNull) {
+ this.coder = Optional.fromNullable(coderOrNull);
+ return this;
}
/**
- * Checks that the {@code Iterable} contains the expected elements, in any order.
- *
- * <p>Returns this {@code IterableAssert}.
+ * Gets the coder, which may yet be absent.
*/
- @Override
- public PCollectionContentsAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
- return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
- }
-
- @Override
- public PCollectionContentsAssert<T> empty() {
- return containsInAnyOrder(Collections.<T>emptyList());
- }
-
- @Override
- public PCollectionContentsAssert<T> satisfies(
- SerializableFunction<Iterable<T>, Void> checkerFn) {
- actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
- return this;
+ public Coder<T> getCoder() {
+ if (coder.isPresent()) {
+ return coder.get();
+ } else {
+ throw new IllegalStateException(
+ "Attempting to access the coder of an IterableAssert"
+ + " that has not been set yet.");
+ }
}
/**
- * Checks that the {@code Iterable} contains elements that match the provided matchers, in any
- * order.
+ * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}.
*
* <p>Returns this {@code IterableAssert}.
*/
- @SafeVarargs
- final PCollectionContentsAssert<T> containsInAnyOrder(
- SerializableMatcher<? super T>... elementMatchers) {
- return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
+ public IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn) {
+ pipeline.apply(
+ "PAssert$" + (assertCount++),
+ new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
+ return this;
}
/**
@@ -297,11 +255,17 @@ public class PAssert {
*
* <p>Returns this {@code IterableAssert}.
*/
- private PCollectionContentsAssert<T> satisfies(
- AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) {
- return satisfies(
- new CheckRelationAgainstExpected<Iterable<T>>(
- relation, expectedElements, IterableCoder.of(actual.getCoder())));
+ public IterableAssert<T> satisfies(
+ AssertRelation<Iterable<T>, Iterable<T>> relation,
+ final Iterable<T> expectedElements) {
+ pipeline.apply(
+ "PAssert$" + (assertCount++),
+ new TwoSideInputAssert<Iterable<T>, Iterable<T>>(
+ createActual,
+ new CreateExpected<T, Iterable<T>>(expectedElements, coder, View.<T>asIterable()),
+ relation));
+
+ return this;
}
/**
@@ -309,14 +273,15 @@ public class PAssert {
*
* <p>Returns this {@code IterableAssert}.
*/
- PCollectionContentsAssert<T> satisfies(
- final SerializableMatcher<Iterable<? extends T>> matcher) {
+ IterableAssert<T> satisfies(final SerializableMatcher<Iterable<? extends T>> matcher) {
// Safe covariant cast. Could be elided by changing a lot of this file to use
// more flexible bounds.
@SuppressWarnings({"rawtypes", "unchecked"})
SerializableFunction<Iterable<T>, Void> checkerFn =
- (SerializableFunction) new MatcherCheckerFn<>(matcher);
- actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
+ (SerializableFunction) new MatcherCheckerFn<>(matcher);
+ pipeline.apply(
+ "PAssert$" + (assertCount++),
+ new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
return this;
}
@@ -335,9 +300,19 @@ public class PAssert {
}
/**
+ * Checks that the {@code Iterable} is empty.
+ *
+ * <p>Returns this {@code IterableAssert}.
+ */
+ public IterableAssert<T> empty() {
+ return satisfies(new AssertContainsInAnyOrderRelation<T>(), Collections.<T>emptyList());
+ }
+
+ /**
* @throws UnsupportedOperationException always
- * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If you meant
- * to test object equality, use a variant of {@link #containsInAnyOrder} instead.
+ * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects.
+ * If you meant to test object equality, use a variant of {@link #containsInAnyOrder}
+ * instead.
*/
@Deprecated
@Override
@@ -356,129 +331,169 @@ public class PAssert {
throw new UnsupportedOperationException(
String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName()));
}
- }
- /**
- * An {@link IterableAssert} for an iterable that is the sole element of a {@link PCollection}.
- * This does not require the runner to support side inputs.
- */
- private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> {
- private final PCollection<Iterable<T>> actual;
- private final Coder<T> elementCoder;
-
- public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) {
- this.actual = actual;
+ /**
+ * Checks that the {@code Iterable} contains the expected elements, in any
+ * order.
+ *
+ * <p>Returns this {@code IterableAssert}.
+ */
+ public IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
+ return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
+ }
- @SuppressWarnings("unchecked")
- Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0);
- this.elementCoder = typedCoder;
+ /**
+ * Checks that the {@code Iterable} contains the expected elements, in any
+ * order.
+ *
+ * <p>Returns this {@code IterableAssert}.
+ */
+ @SafeVarargs
+ public final IterableAssert<T> containsInAnyOrder(T... expectedElements) {
+ return satisfies(
+ new AssertContainsInAnyOrderRelation<T>(),
+ Arrays.asList(expectedElements));
}
- @Override
+ /**
+ * Checks that the {@code Iterable} contains elements that match the provided matchers,
+ * in any order.
+ *
+ * <p>Returns this {@code IterableAssert}.
+ */
@SafeVarargs
- public final PCollectionSingletonIterableAssert<T> containsInAnyOrder(T... expectedElements) {
- return containsInAnyOrder(Arrays.asList(expectedElements));
+ final IterableAssert<T> containsInAnyOrder(
+ SerializableMatcher<? super T>... elementMatchers) {
+ return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
}
+ }
- @Override
- public PCollectionSingletonIterableAssert<T> empty() {
- return containsInAnyOrder(Collections.<T>emptyList());
+ /**
+ * An assertion about the single value of type {@code T}
+ * associated with a {@link PCollectionView}.
+ */
+ public static class SingletonAssert<T> implements Serializable {
+ private final Pipeline pipeline;
+ private final CreateActual<?, T> createActual;
+ private Optional<Coder<T>> coder;
+
+ protected SingletonAssert(
+ CreateActual<?, T> createActual, Pipeline pipeline) {
+ this.pipeline = pipeline;
+ this.createActual = createActual;
+ this.coder = Optional.absent();
}
+ /**
+ * Always throws an {@link UnsupportedOperationException}: users are probably looking for
+ * {@link #isEqualTo}.
+ */
+ @Deprecated
@Override
- public PCollectionSingletonIterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
- return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "tests for Java equality of the %s object, not the PCollection in question. "
+ + "Call a test method, such as isEqualTo.",
+ getClass().getSimpleName()));
}
+ /**
+ * @throws UnsupportedOperationException always.
+ * @deprecated {@link Object#hashCode()} is not supported on PAssert objects.
+ */
+ @Deprecated
@Override
- public PCollectionSingletonIterableAssert<T> satisfies(
- SerializableFunction<Iterable<T>, Void> checkerFn) {
- actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn));
- return this;
+ public int hashCode() {
+ throw new UnsupportedOperationException(
+ String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
}
- private PCollectionSingletonIterableAssert<T> satisfies(
- AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) {
- return satisfies(
- new CheckRelationAgainstExpected<Iterable<T>>(
- relation, expectedElements, IterableCoder.of(elementCoder)));
+ /**
+ * Sets the coder to use for elements of type {@code T}, as needed
+ * for internal purposes.
+ *
+ * <p>Returns this {@code IterableAssert}.
+ */
+ public SingletonAssert<T> setCoder(Coder<T> coderOrNull) {
+ this.coder = Optional.fromNullable(coderOrNull);
+ return this;
}
- }
- /**
- * An assertion about the contents of a {@link PCollection} when it is viewed as a single value
- * of type {@code ViewT}. This requires side input support from the runner.
- */
- private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> {
- private final PCollection<ElemT> actual;
- private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
- private final Coder<ViewT> coder;
-
- protected PCollectionViewAssert(
- PCollection<ElemT> actual,
- PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
- Coder<ViewT> coder) {
- this.actual = actual;
- this.view = view;
- this.coder = coder;
+ /**
+ * Gets the coder, which may yet be absent.
+ */
+ public Coder<T> getCoder() {
+ if (coder.isPresent()) {
+ return coder.get();
+ } else {
+ throw new IllegalStateException(
+ "Attempting to access the coder of an IterableAssert that has not been set yet.");
+ }
}
- @Override
- public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
- return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
+ /**
+ * Applies a {@link SerializableFunction} to check the value of this
+ * {@code SingletonAssert}'s view.
+ *
+ * <p>Returns this {@code SingletonAssert}.
+ */
+ public SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) {
+ pipeline.apply(
+ "PAssert$" + (assertCount++), new OneSideInputAssert<T>(createActual, checkerFn));
+ return this;
}
- @Override
- public PCollectionViewAssert<ElemT, ViewT> notEqualTo(ViewT expectedValue) {
- return satisfies(new AssertNotEqualToRelation<ViewT>(), expectedValue);
- }
+ /**
+ * Applies an {@link AssertRelation} to check the provided relation against the
+ * value of this assert and the provided expected value.
+ *
+ * <p>Returns this {@code SingletonAssert}.
+ */
+ public SingletonAssert<T> satisfies(
+ AssertRelation<T, T> relation,
+ final T expectedValue) {
+ pipeline.apply(
+ "PAssert$" + (assertCount++),
+ new TwoSideInputAssert<T, T>(
+ createActual,
+ new CreateExpected<T, T>(Arrays.asList(expectedValue), coder, View.<T>asSingleton()),
+ relation));
- @Override
- public PCollectionViewAssert<ElemT, ViewT> satisfies(
- SerializableFunction<ViewT, Void> checkerFn) {
- actual
- .getPipeline()
- .apply(
- "PAssert$" + (assertCount++),
- new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn));
return this;
}
/**
- * Applies an {@link AssertRelation} to check the provided relation against the value of this
- * assert and the provided expected value.
+ * Checks that the value of this {@code SingletonAssert}'s view is equal
+ * to the expected value.
*
* <p>Returns this {@code SingletonAssert}.
*/
- private PCollectionViewAssert<ElemT, ViewT> satisfies(
- AssertRelation<ViewT, ViewT> relation, final ViewT expectedValue) {
- return satisfies(new CheckRelationAgainstExpected<ViewT>(relation, expectedValue, coder));
+ public SingletonAssert<T> isEqualTo(T expectedValue) {
+ return satisfies(new AssertIsEqualToRelation<T>(), expectedValue);
}
/**
- * Always throws an {@link UnsupportedOperationException}: users are probably looking for
- * {@link #isEqualTo}.
+ * Checks that the value of this {@code SingletonAssert}'s view is not equal
+ * to the expected value.
+ *
+ * <p>Returns this {@code SingletonAssert}.
*/
- @Deprecated
- @Override
- public boolean equals(Object o) {
- throw new UnsupportedOperationException(
- String.format(
- "tests for Java equality of the %s object, not the PCollection in question. "
- + "Call a test method, such as isEqualTo.",
- getClass().getSimpleName()));
+ public SingletonAssert<T> notEqualTo(T expectedValue) {
+ return satisfies(new AssertNotEqualToRelation<T>(), expectedValue);
}
/**
- * @throws UnsupportedOperationException always.
- * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects.
+ * Checks that the value of this {@code SingletonAssert}'s view is equal to
+ * the expected value.
+ *
+ * @deprecated replaced by {@link #isEqualTo}
*/
@Deprecated
- @Override
- public int hashCode() {
- throw new UnsupportedOperationException(
- String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
+ public SingletonAssert<T> is(T expectedValue) {
+ return isEqualTo(expectedValue);
}
+
}
////////////////////////////////////////////////////////////////////////
@@ -489,13 +504,8 @@ public class PAssert {
private final transient PCollection<T> actual;
private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
- public static <T, ActualT> CreateActual<T, ActualT> from(
- PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
- return new CreateActual<>(actual, actualView);
- }
-
- private CreateActual(
- PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+ private CreateActual(PCollection<T> actual,
+ PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
this.actual = actual;
this.actualView = actualView;
}
@@ -505,129 +515,73 @@ public class PAssert {
final Coder<T> coder = actual.getCoder();
return actual
.apply(Window.<T>into(new GlobalWindows()))
- .apply(
- ParDo.of(
- new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext context) throws CoderException {
- context.output(CoderUtils.clone(coder, context.element()));
- }
- }))
+ .apply(ParDo.of(new DoFn<T, T>() {
+ @Override
+ public void processElement(ProcessContext context) throws CoderException {
+ context.output(CoderUtils.clone(coder, context.element()));
+ }
+ }))
.apply(actualView);
}
}
- /**
- * A partially applied {@link AssertRelation}, where one value is provided along with a coder to
- * serialize/deserialize them.
- */
- private static class CheckRelationAgainstExpected<T> implements SerializableFunction<T, Void> {
- private final AssertRelation<T, T> relation;
- private final byte[] encodedExpected;
- private final Coder<T> coder;
+ private static class CreateExpected<T, ExpectedT>
+ extends PTransform<PBegin, PCollectionView<ExpectedT>> {
- public CheckRelationAgainstExpected(AssertRelation<T, T> relation, T expected, Coder<T> coder) {
- this.relation = relation;
- this.coder = coder;
+ private final Iterable<T> elements;
+ private final Optional<Coder<T>> coder;
+ private final transient PTransform<PCollection<T>, PCollectionView<ExpectedT>> view;
- try {
- this.encodedExpected = CoderUtils.encodeToByteArray(coder, expected);
- } catch (IOException coderException) {
- throw new RuntimeException(coderException);
- }
+ private CreateExpected(Iterable<T> elements, Optional<Coder<T>> coder,
+ PTransform<PCollection<T>, PCollectionView<ExpectedT>> view) {
+ this.elements = elements;
+ this.coder = coder;
+ this.view = view;
}
@Override
- public Void apply(T actual) {
- try {
- T expected = CoderUtils.decodeFromByteArray(coder, encodedExpected);
- return relation.assertFor(expected).apply(actual);
- } catch (IOException coderException) {
- throw new RuntimeException(coderException);
+ public PCollectionView<ExpectedT> apply(PBegin input) {
+ Create.Values<T> createTransform = Create.<T>of(elements);
+ if (coder.isPresent()) {
+ createTransform = createTransform.withCoder(coder.get());
}
+ return input.apply(createTransform).apply(view);
}
}
- /**
- * A transform that gathers the contents of a {@link PCollection} into a single main input
- * iterable in the global window. This requires a runner to support {@link GroupByKey} in the
- * global window, but not side inputs or other windowing or triggers.
- */
- private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>>
- implements Serializable {
-
- public GroupGlobally() {}
-
- @Override
- public PCollection<Iterable<T>> apply(PCollection<T> input) {
- return input
- .apply("GloballyWindow", Window.<T>into(new GlobalWindows()))
- .apply("DummyKey", WithKeys.<Integer, T>of(0))
- .apply("GroupByKey", GroupByKey.<Integer, T>create())
- .apply("GetOnlyValue", Values.<Iterable<T>>create());
- }
- }
-
- /**
- * A transform that applies an assertion-checking function over iterables of {@code ActualT} to
- * the entirety of the contents of its input.
- */
- public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone>
- implements Serializable {
- private final SerializableFunction<Iterable<T>, Void> checkerFn;
-
- private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) {
- this.checkerFn = checkerFn;
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- input
- .apply("GroupGlobally", new GroupGlobally<T>())
- .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
+ private static class PreExisting<T> extends PTransform<PBegin, PCollectionView<T>> {
- return PDone.in(input.getPipeline());
- }
- }
+ private final PCollectionView<T> view;
- /**
- * A transform that applies an assertion-checking function to a single iterable contained as the
- * sole element of a {@link PCollection}.
- */
- public static class GroupThenAssertForSingleton<T>
- extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable {
- private final SerializableFunction<Iterable<T>, Void> checkerFn;
-
- private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) {
- this.checkerFn = checkerFn;
+ private PreExisting(PCollectionView<T> view) {
+ this.view = view;
}
@Override
- public PDone apply(PCollection<Iterable<T>> input) {
- input
- .apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
- .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
-
- return PDone.in(input.getPipeline());
+ public PCollectionView<T> apply(PBegin input) {
+ return view;
}
}
/**
- * An assertion checker that takes a single {@link PCollectionView
- * PCollectionView<ActualT>} and an assertion over {@code ActualT}, and checks it within a
- * Beam pipeline.
+ * An assertion checker that takes a single
+ * {@link PCollectionView PCollectionView<ActualT>}
+ * and an assertion over {@code ActualT}, and checks it within a dataflow
+ * pipeline.
*
- * <p>Note that the entire assertion must be serializable.
+ * <p>Note that the entire assertion must be serializable. If
+ * you need to make assertions involving multiple inputs
+ * that are each not serializable, use TwoSideInputAssert.
*
- * <p>This is generally useful for assertion functions that are serializable but whose underlying
- * data may not have a coder.
+ * <p>This is generally useful for assertion functions that
+ * are serializable but whose underlying data may not have a coder.
*/
- public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone>
- implements Serializable {
+ public static class OneSideInputAssert<ActualT>
+ extends PTransform<PBegin, PDone> implements Serializable {
private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
private final SerializableFunction<ActualT, Void> checkerFn;
- private OneSideInputAssert(
+ public OneSideInputAssert(
PTransform<PBegin, PCollectionView<ActualT>> createActual,
SerializableFunction<ActualT, Void> checkerFn) {
this.createActual = createActual;
@@ -640,23 +594,21 @@ public class PAssert {
input
.apply(Create.of(0).withCoder(VarIntCoder.of()))
- .apply(
- ParDo.named("RunChecks")
- .withSideInputs(actual)
- .of(new SideInputCheckerDoFn<>(checkerFn, actual)));
+ .apply(ParDo.named("RunChecks").withSideInputs(actual)
+ .of(new CheckerDoFn<>(checkerFn, actual)));
return PDone.in(input.getPipeline());
}
}
/**
- * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
- * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
*
* <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
* null values.
*/
- private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
+ private static class CheckerDoFn<ActualT> extends DoFn<Integer, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -664,8 +616,9 @@ public class PAssert {
createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
private final PCollectionView<ActualT> actual;
- private SideInputCheckerDoFn(
- SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) {
+ private CheckerDoFn(
+ SerializableFunction<ActualT, Void> checkerFn,
+ PCollectionView<ActualT> actual) {
this.checkerFn = checkerFn;
this.actual = actual;
}
@@ -674,9 +627,12 @@ public class PAssert {
public void processElement(ProcessContext c) {
try {
ActualT actualContents = c.sideInput(actual);
- doChecks(actualContents, checkerFn, success, failure);
+ checkerFn.apply(actualContents);
+ success.addValue(1);
} catch (Throwable t) {
- // Suppress exception in streaming
+ LOG.error("PAssert failed expectations.", t);
+ failure.addValue(1);
+ // TODO: allow for metrics to propagate on failure when running a streaming pipeline
if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
throw t;
}
@@ -685,89 +641,87 @@ public class PAssert {
}
/**
- * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
- * the single iterable element of the input {@link PCollection} and adjusts counters and
- * thrown exceptions for use in testing.
+ * An assertion checker that takes a {@link PCollectionView PCollectionView<ActualT>},
+ * a {@link PCollectionView PCollectionView<ExpectedT>}, a relation
+ * over {@code A} and {@code B}, and checks that the relation holds
+ * within a dataflow pipeline.
*
- * <p>The singleton property is presumed, not enforced.
+ * <p>This is useful when either/both of {@code A} and {@code B}
+ * are not serializable, but have coders (provided
+ * by the underlying {@link PCollection}s).
*/
- private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
- private final SerializableFunction<ActualT, Void> checkerFn;
- private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
- private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ public static class TwoSideInputAssert<ActualT, ExpectedT>
+ extends PTransform<PBegin, PDone> implements Serializable {
- private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
- this.checkerFn = checkerFn;
+ private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
+ private final transient PTransform<PBegin, PCollectionView<ExpectedT>> createExpected;
+ private final AssertRelation<ActualT, ExpectedT> relation;
+
+ protected TwoSideInputAssert(
+ PTransform<PBegin, PCollectionView<ActualT>> createActual,
+ PTransform<PBegin, PCollectionView<ExpectedT>> createExpected,
+ AssertRelation<ActualT, ExpectedT> relation) {
+ this.createActual = createActual;
+ this.createExpected = createExpected;
+ this.relation = relation;
}
@Override
- public void processElement(ProcessContext c) {
- try {
- doChecks(c.element(), checkerFn, success, failure);
- } catch (Throwable t) {
- // Suppress exception in streaming
- if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
- throw t;
- }
- }
- }
- }
+ public PDone apply(PBegin input) {
+ final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
+ final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected);
- /**
- * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
- * the single item contained within the single iterable on input and
- * adjusts counters and thrown exceptions for use in testing.
- *
- * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
- * each input element must be a singleton iterable, or this will fail.
- */
- private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
- private final SerializableFunction<ActualT, Void> checkerFn;
- private final Aggregator<Integer, Integer> success =
- createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
- private final Aggregator<Integer, Integer> failure =
- createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ input
+ .apply(Create.of(0).withCoder(VarIntCoder.of()))
+ .apply("RunChecks", ParDo.withSideInputs(actual, expected)
+ .of(new CheckerDoFn<>(relation, actual, expected)));
- private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
- this.checkerFn = checkerFn;
+ return PDone.in(input.getPipeline());
}
- @Override
- public void processElement(ProcessContext c) {
- try {
- ActualT actualContents = Iterables.getOnlyElement(c.element());
- doChecks(actualContents, checkerFn, success, failure);
- } catch (Throwable t) {
- // Suppress exception in streaming
- if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
- throw t;
- }
+ /**
+ * Input is ignored, but is {@link Integer} for runners that do not support null values.
+ */
+ private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Integer, Void> {
+ private final Aggregator<Integer, Integer> success =
+ createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ private final Aggregator<Integer, Integer> failure =
+ createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+ private final AssertRelation<ActualT, ExpectedT> relation;
+ private final PCollectionView<ActualT> actual;
+ private final PCollectionView<ExpectedT> expected;
+
+ private CheckerDoFn(AssertRelation<ActualT, ExpectedT> relation,
+ PCollectionView<ActualT> actual, PCollectionView<ExpectedT> expected) {
+ this.relation = relation;
+ this.actual = actual;
+ this.expected = expected;
}
- }
- }
- private static <ActualT> void doChecks(
- ActualT actualContents,
- SerializableFunction<ActualT, Void> checkerFn,
- Aggregator<Integer, Integer> successAggregator,
- Aggregator<Integer, Integer> failureAggregator) {
- try {
- checkerFn.apply(actualContents);
- successAggregator.addValue(1);
- } catch (Throwable t) {
- LOG.error("PAssert failed expectations.", t);
- failureAggregator.addValue(1);
- throw t;
+ @Override
+ public void processElement(ProcessContext c) {
+ try {
+ ActualT actualContents = c.sideInput(actual);
+ ExpectedT expectedContents = c.sideInput(expected);
+ relation.assertFor(expectedContents).apply(actualContents);
+ success.addValue(1);
+ } catch (Throwable t) {
+ LOG.error("PAssert failed expectations.", t);
+ failure.addValue(1);
+ // TODO: allow for metrics to propagate on failure when running a streaming pipeline
+ if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
+ throw t;
+ }
+ }
+ }
}
}
/////////////////////////////////////////////////////////////////////////////
/**
- * A {@link SerializableFunction} that verifies that an actual value is equal to an expected
- * value.
+ * A {@link SerializableFunction} that verifies that an actual value is equal to an
+ * expected value.
*/
private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> {
private T expected;
@@ -784,8 +738,8 @@ public class PAssert {
}
/**
- * A {@link SerializableFunction} that verifies that an actual value is not equal to an expected
- * value.
+ * A {@link SerializableFunction} that verifies that an actual value is not equal to an
+ * expected value.
*/
private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> {
private T expected;
@@ -802,8 +756,8 @@ public class PAssert {
}
/**
- * A {@link SerializableFunction} that verifies that an {@code Iterable} contains expected items
- * in any order.
+ * A {@link SerializableFunction} that verifies that an {@code Iterable} contains
+ * expected items in any order.
*/
private static class AssertContainsInAnyOrder<T>
implements SerializableFunction<Iterable<T>, Void> {
@@ -833,9 +787,10 @@ public class PAssert {
////////////////////////////////////////////////////////////
/**
- * A binary predicate between types {@code Actual} and {@code Expected}. Implemented as a method
- * {@code assertFor(Expected)} which returns a {@code SerializableFunction<Actual, Void>} that
- * should verify the assertion..
+ * A binary predicate between types {@code Actual} and {@code Expected}.
+ * Implemented as a method {@code assertFor(Expected)} which returns
+ * a {@code SerializableFunction<Actual, Void>}
+ * that should verify the assertion..
*/
private static interface AssertRelation<ActualT, ExpectedT> extends Serializable {
public SerializableFunction<ActualT, Void> assertFor(ExpectedT input);
@@ -844,7 +799,8 @@ public class PAssert {
/**
* An {@link AssertRelation} implementing the binary predicate that two objects are equal.
*/
- private static class AssertIsEqualToRelation<T> implements AssertRelation<T, T> {
+ private static class AssertIsEqualToRelation<T>
+ implements AssertRelation<T, T> {
@Override
public SerializableFunction<T, Void> assertFor(T expected) {
return new AssertIsEqualTo<T>(expected);
@@ -854,7 +810,8 @@ public class PAssert {
/**
* An {@link AssertRelation} implementing the binary predicate that two objects are not equal.
*/
- private static class AssertNotEqualToRelation<T> implements AssertRelation<T, T> {
+ private static class AssertNotEqualToRelation<T>
+ implements AssertRelation<T, T> {
@Override
public SerializableFunction<T, Void> assertFor(T expected) {
return new AssertNotEqualTo<T>(expected);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045b568f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index fdc8719..f540948 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.testing;
+import static org.apache.beam.sdk.testing.SerializableMatchers.anything;
+import static org.apache.beam.sdk.testing.SerializableMatchers.not;
+
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -148,6 +151,30 @@ public class PAssertTest implements Serializable {
}
/**
+ * Basic test of succeeding {@link PAssert} using a {@link SerializableMatcher}.
+ */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testBasicMatcherSuccess() throws Exception {
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
+ PAssert.that(pcollection).containsInAnyOrder(anything());
+ pipeline.run();
+ }
+
+ /**
+ * Basic test of failing {@link PAssert} using a {@link SerializableMatcher}.
+ */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testBasicMatcherFailure() throws Exception {
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
+ PAssert.that(pcollection).containsInAnyOrder(not(anything()));
+ runExpectingAssertionFailure(pipeline);
+ }
+
+ /**
* Test that we throw an error at pipeline construction time when the user mistakenly uses
* {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}.
*/
[2/2] incubator-beam git commit: This closes #457
Posted by ke...@apache.org.
This closes #457
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8ad2e7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8ad2e7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8ad2e7d
Branch: refs/heads/master
Commit: c8ad2e7dd5443ba40d126dca4cd3cb29b33103cf
Parents: f9a9214 045b568
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 08:52:03 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 08:52:03 2016 -0700
----------------------------------------------------------------------
.../testing/TestDataflowPipelineRunner.java | 3 +-
.../org/apache/beam/sdk/testing/PAssert.java | 779 +++++++++----------
.../apache/beam/sdk/testing/PAssertTest.java | 27 +
3 files changed, 396 insertions(+), 413 deletions(-)
----------------------------------------------------------------------