You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:37 UTC
[19/50] incubator-beam git commit: Port some of ParDoTest to new DoFn
Port some of ParDoTest to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30940179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30940179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30940179
Branch: refs/heads/apex-runner
Commit: 3094017956913b583a9bd8be5ce685683b591669
Parents: c2e751f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 12:35:03 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/DoFn.java | 7 ++++---
.../java/org/apache/beam/sdk/util/StringUtils.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 16 +++++++---------
3 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 11ca853..018877f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -216,9 +215,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
/**
- * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context.
+ * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this
+ * context.
*
- * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method.
+ * <p>This method should be called by runners before the {@link StartBundle @StartBundle}
+ * method.
*/
@Experimental(Kind.AGGREGATOR)
protected final void setupDelegateAggregators() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
index 4f81eef..1c52c1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
@@ -96,7 +96,7 @@ public class StringUtils {
}
private static final String[] STANDARD_NAME_SUFFIXES =
- new String[]{"OldDoFn", "Fn"};
+ new String[]{"OldDoFn", "DoFn", "Fn"};
/**
* Pattern to match a non-anonymous inner class.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 52244a0..d3ea9fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -51,12 +51,12 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.ParDo.Bound;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -90,13 +90,11 @@ public class ParDoTest implements Serializable {
@Rule
public transient ExpectedException thrown = ExpectedException.none();
- private static class PrintingOldDoFn extends OldDoFn<String, String> implements
- RequiresWindowAccess {
-
- @Override
- public void processElement(ProcessContext c) {
+ private static class PrintingDoFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
c.output(c.element() + ":" + c.timestamp().getMillis()
- + ":" + c.window().maxTimestamp().getMillis());
+ + ":" + window.maxTimestamp().getMillis());
}
}
@@ -848,7 +846,7 @@ public class ParDoTest implements Serializable {
output5.getName());
}
- assertEquals("ParDo(Printing)", ParDo.of(new PrintingOldDoFn()).getName());
+ assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName());
assertEquals(
"ParMultiDo(SideOutputDummy)",
@@ -1381,7 +1379,7 @@ public class ParDoTest implements Serializable {
System.out.println("Finish: 3");
}
}))
- .apply(ParDo.of(new PrintingOldDoFn()));
+ .apply(ParDo.of(new PrintingDoFn()));
PAssert.that(output).satisfies(new Checker());