You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 03:10:48 UTC
[6/9] incubator-beam git commit: Port PAssert to new DoFn
Port PAssert to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef5e31f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef5e31f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef5e31f8
Branch: refs/heads/master
Commit: ef5e31f8b79dcedf8feb4bba0e313bfcf330ab1e
Parents: 1959ddb
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:15:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/testing/PAssert.java | 39 ++++++++++----------
1 file changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef5e31f8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 80340c2..e07ee3d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -33,11 +33,10 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -762,8 +761,8 @@ public class PAssert {
.apply("RewindowActuals", rewindowActuals.<T>windowActuals())
.apply(
ParDo.of(
- new OldDoFn<T, T>() {
- @Override
+ new DoFn<T, T>() {
+ @ProcessElement
public void processElement(ProcessContext context) throws CoderException {
context.output(CoderUtils.clone(coder, context.element()));
}
@@ -884,8 +883,8 @@ public class PAssert {
}
}
- private static final class ConcatFn<T> extends OldDoFn<Iterable<Iterable<T>>, Iterable<T>> {
- @Override
+ private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(Iterables.concat(c.element()));
}
@@ -995,13 +994,13 @@ public class PAssert {
}
/**
- * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
* {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
*
* <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
* null values.
*/
- private static class SideInputCheckerDoFn<ActualT> extends OldDoFn<Integer, Void> {
+ private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1015,7 +1014,7 @@ public class PAssert {
this.actual = actual;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
ActualT actualContents = c.sideInput(actual);
@@ -1030,13 +1029,13 @@ public class PAssert {
}
/**
- * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
* the single iterable element of the input {@link PCollection} and adjusts counters and
* thrown exceptions for use in testing.
*
* <p>The singleton property is presumed, not enforced.
*/
- private static class GroupedValuesCheckerDoFn<ActualT> extends OldDoFn<ActualT, Void> {
+ private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1047,7 +1046,7 @@ public class PAssert {
this.checkerFn = checkerFn;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
doChecks(c.element(), checkerFn, success, failure);
@@ -1061,14 +1060,14 @@ public class PAssert {
}
/**
- * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
* the single item contained within the single iterable on input and
* adjusts counters and thrown exceptions for use in testing.
*
* <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
* each input element must be a singleton iterable, or this will fail.
*/
- private static class SingletonCheckerDoFn<ActualT> extends OldDoFn<Iterable<ActualT>, Void> {
+ private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1079,7 +1078,7 @@ public class PAssert {
this.checkerFn = checkerFn;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
ActualT actualContents = Iterables.getOnlyElement(c.element());
@@ -1310,7 +1309,7 @@ public class PAssert {
}
/**
- * A OldDoFn that filters elements based on their presence in a static collection of windows.
+ * A DoFn that filters elements based on their presence in a static collection of windows.
*/
private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final StaticWindows windows;
@@ -1324,10 +1323,10 @@ public class PAssert {
return input.apply("FilterWindows", ParDo.of(new Fn()));
}
- private class Fn extends OldDoFn<T, T> implements RequiresWindowAccess {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- if (windows.getWindows().contains(c.window())) {
+ private class Fn extends DoFn<T, T> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ if (windows.getWindows().contains(window)) {
c.output(c.element());
}
}