You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/13 17:11:37 UTC
[1/2] incubator-beam git commit: Add success/failure counters to new
PAssert mechanism
Repository: incubator-beam
Updated Branches:
refs/heads/master fe5b8db6c -> a3feeefa2
Add success/failure counters to new PAssert mechanism
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd07cba3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd07cba3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd07cba3
Branch: refs/heads/master
Commit: dd07cba31ef1686495562914de13f1ca24ff9ce2
Parents: 60964b6
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Jun 12 18:29:46 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Jun 12 18:29:46 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/testing/PAssert.java | 112 ++++++++++++++-----
1 file changed, 86 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd07cba3/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b10c1cb..62d3599 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -584,15 +584,7 @@ public class PAssert {
public PDone apply(PCollection<T> input) {
input
.apply("GroupGlobally", new GroupGlobally<T>())
- .apply(
- "RunChecks",
- ParDo.of(
- new DoFn<Iterable<T>, Void>() {
- @Override
- public void processElement(ProcessContext context) {
- checkerFn.apply(context.element());
- }
- }));
+ .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
return PDone.in(input.getPipeline());
}
@@ -614,15 +606,7 @@ public class PAssert {
public PDone apply(PCollection<Iterable<T>> input) {
input
.apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
- .apply(
- "RunChecks",
- ParDo.of(
- new DoFn<Iterable<Iterable<T>>, Void>() {
- @Override
- public void processElement(ProcessContext context) {
- checkerFn.apply(Iterables.getOnlyElement(context.element()));
- }
- }));
+ .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
return PDone.in(input.getPipeline());
}
@@ -659,7 +643,7 @@ public class PAssert {
.apply(
ParDo.named("RunChecks")
.withSideInputs(actual)
- .of(new CheckerDoFn<>(checkerFn, actual)));
+ .of(new SideInputCheckerDoFn<>(checkerFn, actual)));
return PDone.in(input.getPipeline());
}
@@ -672,7 +656,7 @@ public class PAssert {
* <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
* null values.
*/
- private static class CheckerDoFn<ActualT> extends DoFn<Integer, Void> {
+ private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -680,7 +664,7 @@ public class PAssert {
createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
private final PCollectionView<ActualT> actual;
- private CheckerDoFn(
+ private SideInputCheckerDoFn(
SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) {
this.checkerFn = checkerFn;
this.actual = actual;
@@ -690,12 +674,40 @@ public class PAssert {
public void processElement(ProcessContext c) {
try {
ActualT actualContents = c.sideInput(actual);
- checkerFn.apply(actualContents);
- success.addValue(1);
+ doChecks(actualContents, checkerFn, success, failure);
+ } catch (Throwable t) {
+ // Suppress exception in streaming
+ if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
+ throw t;
+ }
+ }
+ }
+ }
+
+ /**
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * the single iterable element of the input {@link PCollection} and adjusts counters and
+ * thrown exceptions for use in testing.
+ *
+ * <p>The singleton property is presumed, not enforced.
+ */
+ private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
+ private final SerializableFunction<ActualT, Void> checkerFn;
+ private final Aggregator<Integer, Integer> success =
+ createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ private final Aggregator<Integer, Integer> failure =
+ createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+
+ private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+ this.checkerFn = checkerFn;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) {
+ try {
+ doChecks(c.element(), checkerFn, success, failure);
} catch (Throwable t) {
- LOG.error("PAssert failed expectations.", t);
- failure.addValue(1);
- // TODO: allow for metrics to propagate on failure when running a streaming pipeline
+ // Suppress exception in streaming
if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
throw t;
}
@@ -703,6 +715,54 @@ public class PAssert {
}
}
+ /**
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * the single item contained within the single iterable on input and
+ * adjusts counters and thrown exceptions for use in testing.
+ *
+ * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
+ * each input element must be a singleton iterable, or this will fail.
+ */
+ private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
+ private final SerializableFunction<ActualT, Void> checkerFn;
+ private final Aggregator<Integer, Integer> success =
+ createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+ private final Aggregator<Integer, Integer> failure =
+ createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+
+ private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+ this.checkerFn = checkerFn;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) {
+ try {
+ ActualT actualContents = Iterables.getOnlyElement(c.element());
+ doChecks(actualContents, checkerFn, success, failure);
+ } catch (Throwable t) {
+ // Suppress exception in streaming
+ if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
+ throw t;
+ }
+ }
+ }
+ }
+
+ private static <ActualT> void doChecks(
+ ActualT actualContents,
+ SerializableFunction<ActualT, Void> checkerFn,
+ Aggregator<Integer, Integer> successAggregator,
+ Aggregator<Integer, Integer> failureAggregator) {
+ try {
+ checkerFn.apply(actualContents);
+ successAggregator.addValue(1);
+ } catch (Throwable t) {
+ LOG.error("PAssert failed expectations.", t);
+ failureAggregator.addValue(1);
+ throw t;
+ }
+ }
+
/////////////////////////////////////////////////////////////////////////////
/**
[2/2] incubator-beam git commit: This closes #448
Posted by ke...@apache.org.
This closes #448
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3feeefa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3feeefa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3feeefa
Branch: refs/heads/master
Commit: a3feeefa2db3f759f88cacd8cb89268263ff955a
Parents: fe5b8db dd07cba
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 13 10:11:22 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jun 13 10:11:22 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/testing/PAssert.java | 112 ++++++++++++++-----
1 file changed, 86 insertions(+), 26 deletions(-)
----------------------------------------------------------------------