You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:37 UTC

[19/50] incubator-beam git commit: Port some of ParDoTest to new DoFn

Port some of ParDoTest to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30940179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30940179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30940179

Branch: refs/heads/apex-runner
Commit: 3094017956913b583a9bd8be5ce685683b591669
Parents: c2e751f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 12:35:03 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/DoFn.java   |  7 ++++---
 .../java/org/apache/beam/sdk/util/StringUtils.java  |  2 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java   | 16 +++++++---------
 3 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 11ca853..018877f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -216,9 +215,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
             String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
 
     /**
-     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context.
+     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this
+     * context.
      *
-     * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method.
+     * <p>This method should be called by runners before the {@link StartBundle @StartBundle}
+     * method.
      */
     @Experimental(Kind.AGGREGATOR)
     protected final void setupDelegateAggregators() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
index 4f81eef..1c52c1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
@@ -96,7 +96,7 @@ public class StringUtils {
   }
 
   private static final String[] STANDARD_NAME_SUFFIXES =
-      new String[]{"OldDoFn", "Fn"};
+      new String[]{"OldDoFn", "DoFn", "Fn"};
 
   /**
    * Pattern to match a non-anonymous inner class.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 52244a0..d3ea9fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -51,12 +51,12 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -90,13 +90,11 @@ public class ParDoTest implements Serializable {
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
-  private static class PrintingOldDoFn extends OldDoFn<String, String> implements
-      RequiresWindowAccess {
-
-    @Override
-    public void processElement(ProcessContext c) {
+  private static class PrintingDoFn extends DoFn<String, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
       c.output(c.element() + ":" + c.timestamp().getMillis()
-          + ":" + c.window().maxTimestamp().getMillis());
+          + ":" + window.maxTimestamp().getMillis());
     }
   }
 
@@ -848,7 +846,7 @@ public class ParDoTest implements Serializable {
           output5.getName());
     }
 
-    assertEquals("ParDo(Printing)", ParDo.of(new PrintingOldDoFn()).getName());
+    assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName());
 
     assertEquals(
         "ParMultiDo(SideOutputDummy)",
@@ -1381,7 +1379,7 @@ public class ParDoTest implements Serializable {
                     System.out.println("Finish: 3");
                   }
                 }))
-        .apply(ParDo.of(new PrintingOldDoFn()));
+        .apply(ParDo.of(new PrintingDoFn()));
 
     PAssert.that(output).satisfies(new Checker());