You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/15 22:29:05 UTC
[05/10] incubator-beam git commit: Removes OldDoFn from ParDo
Removes OldDoFn from ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9e53c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9e53c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9e53c5d
Branch: refs/heads/master
Commit: e9e53c5d037561aa4dcacfcde69d76a03f3a1571
Parents: 8330bfa
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:13:43 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/ParDo.java | 167 +++----------------
.../apache/beam/sdk/transforms/OldDoFnTest.java | 125 ++++----------
2 files changed, 55 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9e53c5d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 167f5fa..d2149c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -530,23 +529,6 @@ public class ParDo {
return new Unbound().of(fn, displayDataForFn(fn));
}
- /**
- * Creates a {@link ParDo} {@link PTransform} that will invoke the
- * given {@link OldDoFn} function.
- *
- * <p>The resulting {@link PTransform PTransform's} types have been bound, with the
- * input being a {@code PCollection<InputT>} and the output a
- * {@code PCollection<OutputT>}, inferred from the types of the argument
- * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further
- * properties can be set on it first.
- *
- * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
- */
- @Deprecated
- public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
- return new Unbound().of(fn, displayDataForFn(fn));
- }
-
private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
return DisplayData.item("fn", fn.getClass()).withLabel("Transform Function");
}
@@ -557,12 +539,7 @@ public class ParDo {
* the {@link PCollection}.
*/
private static <InputT, OutputT> void validateWindowType(
- PCollection<? extends InputT> input, Serializable fn) {
- // No validation for OldDoFn
- if (!(fn instanceof DoFn)) {
- return;
- }
-
+ PCollection<? extends InputT> input, DoFn<InputT, OutputT> fn) {
DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
TypeDescriptor<? extends BoundedWindow> actualWindowT =
@@ -609,10 +586,6 @@ public class ParDo {
}
}
- private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFn<InputT, OutputT> fn) {
- return DoFnAdapters.toOldDoFn(fn);
- }
-
/**
* An incomplete {@link ParDo} transform, with unbound input/output types.
*
@@ -688,24 +661,9 @@ public class ParDo {
return new UnboundMulti<>(name, sideInputs, mainOutputTag, sideOutputTags);
}
- /**
- * Returns a new {@link ParDo} {@link PTransform} that's like this
- * transform but that will invoke the given {@link OldDoFn}
- * function, and that has its input and output types bound. Does
- * not modify this transform. The resulting {@link PTransform} is
- * sufficiently specified to be applied, but more properties can
- * still be specified.
- *
- * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
- */
- @Deprecated
- public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> oldFn) {
- return of(oldFn, displayDataForFn(oldFn));
- }
-
private <InputT, OutputT> Bound<InputT, OutputT> of(
- Serializable originalFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
- return new Bound<>(name, originalFn, sideInputs, fnDisplayData);
+ DoFn<InputT, OutputT> doFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+ return new Bound<>(name, doFn, sideInputs, fnDisplayData);
}
}
@@ -725,12 +683,12 @@ public class ParDo {
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
// Inherits name.
private final List<PCollectionView<?>> sideInputs;
- private final Serializable fn;
+ private final DoFn<InputT, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
Bound(
String name,
- Serializable fn,
+ DoFn<InputT, OutputT> fn,
List<PCollectionView<?>> sideInputs,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
super(name);
@@ -787,7 +745,7 @@ public class ParDo {
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
checkArgument(
- !isSplittable(getOldFn()),
+ !isSplittable(getNewFn()),
"%s does not support Splittable DoFn",
input.getPipeline().getOptions().getRunner().getName());
validateWindowType(input, fn);
@@ -795,7 +753,7 @@ public class ParDo {
input.getPipeline(),
input.getWindowingStrategy(),
input.isBounded())
- .setTypeDescriptor(getOldFn().getOutputTypeDescriptor());
+ .setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
}
@Override
@@ -803,14 +761,14 @@ public class ParDo {
protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
throws CannotProvideCoderException {
return input.getPipeline().getCoderRegistry().getDefaultCoder(
- getOldFn().getOutputTypeDescriptor(),
- getOldFn().getInputTypeDescriptor(),
+ getNewFn().getOutputTypeDescriptor(),
+ getNewFn().getInputTypeDescriptor(),
((PCollection<InputT>) input).getCoder());
}
@Override
protected String getKindString() {
- Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn());
+ Class<?> clazz = getNewFn().getClass();
if (clazz.isAnonymousClass()) {
return "AnonymousParDo";
} else {
@@ -831,44 +789,7 @@ public class ParDo {
ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
}
- /**
- * @deprecated this method to be converted to return {@link DoFn}. If you want to receive
- * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}.
- */
- @Deprecated
- public OldDoFn<InputT, OutputT> getFn() {
- return getOldFn();
- }
-
- /**
- * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is migrated to return
- * a {@link DoFn}.
- */
- @Deprecated
- public OldDoFn<InputT, OutputT> getOldFn() {
- if (fn instanceof OldDoFn) {
- return (OldDoFn<InputT, OutputT>) fn;
- } else {
- return adapt((DoFn<InputT, OutputT>) fn);
- }
- }
-
public DoFn<InputT, OutputT> getNewFn() {
- if (fn instanceof DoFn) {
- return (DoFn<InputT, OutputT>) fn;
- } else {
- return ((OldDoFn<InputT, OutputT>) fn).toDoFn();
- }
- }
-
- /**
- * Returns the {@link OldDoFn} or {@link DoFn} used to create this transform.
- *
- * @deprecated for migration purposes only. There are some cases of {@link OldDoFn} that are not
- * fully supported by wrapping it into a {@link DoFn}, such as {@link RequiresWindowAccess}.
- */
- @Deprecated
- public Object getOriginalFn() {
return fn;
}
@@ -951,23 +872,8 @@ public class ParDo {
return of(fn, displayDataForFn(fn));
}
- /**
- * Returns a new multi-output {@link ParDo} {@link PTransform}
- * that's like this transform but that will invoke the given
- * {@link OldDoFn} function, and that has its input type bound.
- * Does not modify this transform. The resulting
- * {@link PTransform} is sufficiently specified to be applied, but
- * more properties can still be specified.
- *
- * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
- */
- @Deprecated
- public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
- return of(fn, displayDataForFn(fn));
- }
-
private <InputT> BoundMulti<InputT, OutputT> of(
- Serializable fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+ DoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new BoundMulti<>(name, fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData);
}
}
@@ -990,11 +896,11 @@ public class ParDo {
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList sideOutputTags;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
- private final Serializable fn;
+ private final DoFn<InputT, OutputT> fn;
BoundMulti(
String name,
- Serializable fn,
+ DoFn<InputT, OutputT> fn,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
TupleTagList sideOutputTags,
@@ -1046,7 +952,7 @@ public class ParDo {
@Override
public PCollectionTuple expand(PCollection<? extends InputT> input) {
checkArgument(
- !isSplittable(getOldFn()),
+ !isSplittable(getNewFn()),
"%s does not support Splittable DoFn",
input.getPipeline().getOptions().getRunner().getName());
validateWindowType(input, fn);
@@ -1059,7 +965,7 @@ public class ParDo {
// The fn will likely be an instance of an anonymous subclass
// such as DoFn<Integer, String> { }, thus will have a high-fidelity
// TypeDescriptor for the output type.
- outputs.get(mainOutputTag).setTypeDescriptor(getOldFn().getOutputTypeDescriptor());
+ outputs.get(mainOutputTag).setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
return outputs;
}
@@ -1084,7 +990,7 @@ public class ParDo {
@Override
protected String getKindString() {
- Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn());
+ Class<?> clazz = getNewFn().getClass();
if (clazz.isAnonymousClass()) {
return "AnonymousParMultiDo";
} else {
@@ -1095,37 +1001,11 @@ public class ParDo {
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
- ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
- }
-
- /**
- * @deprecated this method to be converted to return {@link DoFn}. If you want to receive
- * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}.
- */
- @Deprecated
- public OldDoFn<InputT, OutputT> getFn() {
- return getOldFn();
- }
-
- /**
- * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is migrated to return
- * a {@link DoFn}.
- */
- @Deprecated
- public OldDoFn<InputT, OutputT> getOldFn() {
- if (fn instanceof OldDoFn) {
- return (OldDoFn<InputT, OutputT>) fn;
- } else {
- return adapt((DoFn<InputT, OutputT>) fn);
- }
+ ParDo.populateDisplayData(builder, fn, fnDisplayData);
}
public DoFn<InputT, OutputT> getNewFn() {
- if (fn instanceof DoFn) {
- return (DoFn<InputT, OutputT>) fn;
- } else {
- return ((OldDoFn<InputT, OutputT>) fn).toDoFn();
- }
+ return fn;
}
public TupleTag<OutputT> getMainOutputTag() {
@@ -1148,14 +1028,7 @@ public class ParDo {
builder.include("fn", fn).add(fnDisplayData);
}
- private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {
- DoFn<?, ?> fn = DoFnAdapters.getDoFn(oldDoFn);
- if (fn == null) {
- return false;
- }
- return DoFnSignatures
- .getSignature(fn.getClass())
- .processElement()
- .isSplittable();
+ private static boolean isSplittable(DoFn<?, ?> fn) {
+ return DoFnSignatures.signatureForDoFn(fn).processElement().isSplittable();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9e53c5d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index 07e3078..cc84252 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -18,28 +18,20 @@
package org.apache.beam.sdk.transforms;
import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
-import java.util.Map;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -134,68 +126,52 @@ public class OldDoFnTest implements Serializable {
}
@Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInStartBundleThrows() {
- TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+ public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
+ OldDoFn<String, String> fn = new OldDoFn<String, String>() {
@Override
- public void startBundle(OldDoFn<String, String>.Context c) throws Exception {
- createAggregator("anyAggregate", new MaxIntegerFn());
- }
-
- @Override
- public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {}
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
+ public void processElement(ProcessContext c) throws Exception { }
+ };
+ OldDoFn<String, String>.Context context = createContext(fn);
+ context.setupDelegateAggregators();
- p.run();
+ thrown.expect(isA(IllegalStateException.class));
+ fn.createAggregator("anyAggregate", new MaxIntegerFn());
}
- @Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInProcessElementThrows() {
- TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+ private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
+ return fn.new Context() {
@Override
- public void processElement(ProcessContext c) throws Exception {
- createAggregator("anyAggregate", new MaxIntegerFn());
+ public PipelineOptions getPipelineOptions() {
+ throw new UnsupportedOperationException();
}
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
-
- p.run();
- }
- @Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInFinishBundleThrows() {
- TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
@Override
- public void finishBundle(OldDoFn<String, String>.Context c) throws Exception {
- createAggregator("anyAggregate", new MaxIntegerFn());
+ public void output(String output) {
+ throw new UnsupportedOperationException();
}
@Override
- public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {}
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
+ public void outputWithTimestamp(String output, Instant timestamp) {
+ throw new UnsupportedOperationException();
+ }
- p.run();
- }
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ throw new UnsupportedOperationException();
+ }
- /**
- * Initialize a test pipeline with the specified {@link OldDoFn}.
- */
- private <InputT, OutputT> TestPipeline createTestPipeline(OldDoFn<InputT, OutputT> fn) {
- TestPipeline pipeline = TestPipeline.create();
- pipeline.apply(Create.of((InputT) null))
- .apply(ParDo.of(fn));
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ throw new UnsupportedOperationException();
+ }
- return pipeline;
+ @Override
+ public <AggInputT, AggOutputT>
+ Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ throw new UnsupportedOperationException();
+ }
+ };
}
@Test
@@ -209,35 +185,4 @@ public class OldDoFnTest implements Serializable {
DisplayData data = DisplayData.from(usesDefault);
assertThat(data.items(), empty());
}
-
- @Test
- @Category(NeedsRunner.class)
- public void testAggregators() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- CountOddsFn countOdds = new CountOddsFn();
- PCollection<Void> output = pipeline
- .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
- .apply(ParDo.of(countOdds));
- PipelineResult result = pipeline.run();
-
- AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
-
- Map<String, Integer> valuesMap = values.getValuesAtSteps();
-
- assertThat(valuesMap.size(), equalTo(1));
- assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), equalTo(4));
- }
-
- private static class CountOddsFn extends OldDoFn<Integer, Void> {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- if (c.element() % 2 == 1) {
- aggregator.addValue(1);
- }
- }
-
- Aggregator<Integer, Integer> aggregator =
- createAggregator("odds", new SumIntegerFn());
- }
}