You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 03:10:48 UTC

[6/9] incubator-beam git commit: Port PAssert to new DoFn

Port PAssert to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef5e31f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef5e31f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef5e31f8

Branch: refs/heads/master
Commit: ef5e31f8b79dcedf8feb4bba0e313bfcf330ab1e
Parents: 1959ddb
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:15:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 39 ++++++++++----------
 1 file changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef5e31f8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 80340c2..e07ee3d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -33,11 +33,10 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -762,8 +761,8 @@ public class PAssert {
           .apply("RewindowActuals", rewindowActuals.<T>windowActuals())
           .apply(
               ParDo.of(
-                  new OldDoFn<T, T>() {
-                    @Override
+                  new DoFn<T, T>() {
+                    @ProcessElement
                     public void processElement(ProcessContext context) throws CoderException {
                       context.output(CoderUtils.clone(coder, context.element()));
                     }
@@ -884,8 +883,8 @@ public class PAssert {
     }
   }
 
-  private static final class ConcatFn<T> extends OldDoFn<Iterable<Iterable<T>>, Iterable<T>> {
-    @Override
+  private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(Iterables.concat(c.element()));
     }
@@ -995,13 +994,13 @@ public class PAssert {
   }
 
   /**
-   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
    * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
    * null values.
    */
-  private static class SideInputCheckerDoFn<ActualT> extends OldDoFn<Integer, Void> {
+  private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1015,7 +1014,7 @@ public class PAssert {
       this.actual = actual;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       try {
         ActualT actualContents = c.sideInput(actual);
@@ -1030,13 +1029,13 @@ public class PAssert {
   }
 
   /**
-   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
    * the single iterable element of the input {@link PCollection} and adjusts counters and
    * thrown exceptions for use in testing.
    *
    * <p>The singleton property is presumed, not enforced.
    */
-  private static class GroupedValuesCheckerDoFn<ActualT> extends OldDoFn<ActualT, Void> {
+  private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1047,7 +1046,7 @@ public class PAssert {
       this.checkerFn = checkerFn;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       try {
         doChecks(c.element(), checkerFn, success, failure);
@@ -1061,14 +1060,14 @@ public class PAssert {
   }
 
   /**
-   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
    * the single item contained within the single iterable on input and
    * adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
    * each input element must be a singleton iterable, or this will fail.
    */
-  private static class SingletonCheckerDoFn<ActualT> extends OldDoFn<Iterable<ActualT>, Void> {
+  private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1079,7 +1078,7 @@ public class PAssert {
       this.checkerFn = checkerFn;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       try {
         ActualT actualContents = Iterables.getOnlyElement(c.element());
@@ -1310,7 +1309,7 @@ public class PAssert {
   }
 
   /**
-   * A OldDoFn that filters elements based on their presence in a static collection of windows.
+   * A DoFn that filters elements based on their presence in a static collection of windows.
    */
   private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
     private final StaticWindows windows;
@@ -1324,10 +1323,10 @@ public class PAssert {
       return input.apply("FilterWindows", ParDo.of(new Fn()));
     }
 
-    private class Fn extends OldDoFn<T, T> implements RequiresWindowAccess {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        if (windows.getWindows().contains(c.window())) {
+    private class Fn extends DoFn<T, T> {
+      @ProcessElement
+      public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+        if (windows.getWindows().contains(window)) {
           c.output(c.element());
         }
       }