You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/13 17:11:37 UTC

[1/2] incubator-beam git commit: Add success/failure counters to new PAssert mechanism

Repository: incubator-beam
Updated Branches:
  refs/heads/master fe5b8db6c -> a3feeefa2


Add success/failure counters to new PAssert mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd07cba3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd07cba3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd07cba3

Branch: refs/heads/master
Commit: dd07cba31ef1686495562914de13f1ca24ff9ce2
Parents: 60964b6
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Jun 12 18:29:46 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Jun 12 18:29:46 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 112 ++++++++++++++-----
 1 file changed, 86 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd07cba3/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b10c1cb..62d3599 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -584,15 +584,7 @@ public class PAssert {
     public PDone apply(PCollection<T> input) {
       input
           .apply("GroupGlobally", new GroupGlobally<T>())
-          .apply(
-              "RunChecks",
-              ParDo.of(
-                  new DoFn<Iterable<T>, Void>() {
-                    @Override
-                    public void processElement(ProcessContext context) {
-                      checkerFn.apply(context.element());
-                    }
-                  }));
+          .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
 
       return PDone.in(input.getPipeline());
     }
@@ -614,15 +606,7 @@ public class PAssert {
     public PDone apply(PCollection<Iterable<T>> input) {
       input
           .apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
-          .apply(
-              "RunChecks",
-              ParDo.of(
-                  new DoFn<Iterable<Iterable<T>>, Void>() {
-                    @Override
-                    public void processElement(ProcessContext context) {
-                      checkerFn.apply(Iterables.getOnlyElement(context.element()));
-                    }
-                  }));
+          .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
 
       return PDone.in(input.getPipeline());
     }
@@ -659,7 +643,7 @@ public class PAssert {
           .apply(
               ParDo.named("RunChecks")
                   .withSideInputs(actual)
-                  .of(new CheckerDoFn<>(checkerFn, actual)));
+                  .of(new SideInputCheckerDoFn<>(checkerFn, actual)));
 
       return PDone.in(input.getPipeline());
     }
@@ -672,7 +656,7 @@ public class PAssert {
    * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
    * null values.
    */
-  private static class CheckerDoFn<ActualT> extends DoFn<Integer, Void> {
+  private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -680,7 +664,7 @@ public class PAssert {
         createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
     private final PCollectionView<ActualT> actual;
 
-    private CheckerDoFn(
+    private SideInputCheckerDoFn(
         SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) {
       this.checkerFn = checkerFn;
       this.actual = actual;
@@ -690,12 +674,40 @@ public class PAssert {
     public void processElement(ProcessContext c) {
       try {
         ActualT actualContents = c.sideInput(actual);
-        checkerFn.apply(actualContents);
-        success.addValue(1);
+        doChecks(actualContents, checkerFn, success, failure);
+      } catch (Throwable t) {
+        // Suppress exception in streaming
+        if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
+          throw t;
+        }
+      }
+    }
+  }
+
+  /**
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * the single iterable element of the input {@link PCollection} and adjusts counters and
+   * thrown exceptions for use in testing.
+   *
+   * <p>The singleton property is presumed, not enforced.
+   */
+  private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
+    private final SerializableFunction<ActualT, Void> checkerFn;
+    private final Aggregator<Integer, Integer> success =
+        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+    private final Aggregator<Integer, Integer> failure =
+        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+
+    private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+      this.checkerFn = checkerFn;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      try {
+        doChecks(c.element(), checkerFn, success, failure);
       } catch (Throwable t) {
-        LOG.error("PAssert failed expectations.", t);
-        failure.addValue(1);
-        // TODO: allow for metrics to propagate on failure when running a streaming pipeline
+        // Suppress exception in streaming
         if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
           throw t;
         }
@@ -703,6 +715,54 @@ public class PAssert {
     }
   }
 
+  /**
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * the single item contained within the single iterable on input and
+   * adjusts counters and thrown exceptions for use in testing.
+   *
+   * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
+   * each input element must be a singleton iterable, or this will fail.
+   */
+  private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
+    private final SerializableFunction<ActualT, Void> checkerFn;
+    private final Aggregator<Integer, Integer> success =
+        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+    private final Aggregator<Integer, Integer> failure =
+        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
+
+    private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+      this.checkerFn = checkerFn;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      try {
+        ActualT actualContents = Iterables.getOnlyElement(c.element());
+        doChecks(actualContents, checkerFn, success, failure);
+      } catch (Throwable t) {
+        // Suppress exception in streaming
+        if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
+          throw t;
+        }
+      }
+    }
+  }
+
+  private static <ActualT> void doChecks(
+      ActualT actualContents,
+      SerializableFunction<ActualT, Void> checkerFn,
+      Aggregator<Integer, Integer> successAggregator,
+      Aggregator<Integer, Integer> failureAggregator) {
+    try {
+      checkerFn.apply(actualContents);
+      successAggregator.addValue(1);
+    } catch (Throwable t) {
+      LOG.error("PAssert failed expectations.", t);
+      failureAggregator.addValue(1);
+      throw t;
+    }
+  }
+
   /////////////////////////////////////////////////////////////////////////////
 
   /**


[2/2] incubator-beam git commit: This closes #448

Posted by ke...@apache.org.
This closes #448


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3feeefa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3feeefa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3feeefa

Branch: refs/heads/master
Commit: a3feeefa2db3f759f88cacd8cb89268263ff955a
Parents: fe5b8db dd07cba
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 13 10:11:22 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jun 13 10:11:22 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 112 ++++++++++++++-----
 1 file changed, 86 insertions(+), 26 deletions(-)
----------------------------------------------------------------------