You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:23 UTC
[05/50] incubator-beam git commit: Add OldDoFn.toDoFn() adapter
Add OldDoFn.toDoFn() adapter
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44878e57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44878e57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44878e57
Branch: refs/heads/apex-runner
Commit: 44878e57eafc3ee6c437b8537dd1f88024006a51
Parents: fe0b7bf
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 20:44:52 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 18:32:06 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/OldDoFn.java | 183 +++++++++++++++++++
1 file changed, 183 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44878e57/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index a445c7d..912bf24 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -75,6 +75,23 @@ import org.joda.time.Instant;
@Deprecated
public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
+ public DoFn<InputT, OutputT> toDoFn() {
+ if (this instanceof RequiresWindowAccess) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot convert %s to %s because it implements %s."
+ + " Please convert your %s to a %s directly.",
+ getClass(),
+ DoFn.class.getSimpleName(),
+ RequiresWindowAccess.class.getSimpleName(),
+ OldDoFn.class.getSimpleName(),
+ DoFn.class.getSimpleName()));
+ }
+
+ // No parameters as it just accesses `this`
+ return new AdaptedDoFn();
+ }
+
/**
* Information accessible to all methods in this {@code OldDoFn}.
* Used primarily to output elements.
@@ -587,4 +604,170 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
return false;
}
}
+
+ /**
+ * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
+ */
+ private class AdaptedContext extends Context {
+
+ private final DoFn<InputT, OutputT>.Context newContext;
+
+ public AdaptedContext(
+ DoFn<InputT, OutputT>.Context newContext) {
+ this.newContext = newContext;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return newContext.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ newContext.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ newContext.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ newContext.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ newContext.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return null;
+ }
+ }
+
+ /**
+ * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
+ */
+ private class AdaptedProcessContext extends ProcessContext {
+
+ private final DoFn<InputT, OutputT>.ProcessContext newContext;
+
+ public AdaptedProcessContext(
+ DoFn<InputT, OutputT>.ProcessContext newContext) {
+ this.newContext = newContext;
+ }
+
+ @Override
+ public InputT element() {
+ return newContext.element();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return newContext.sideInput(view);
+ }
+
+ @Override
+ public Instant timestamp() {
+ return newContext.timestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new UnsupportedOperationException(String.format(
+ "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
+ OldDoFn.class.getSimpleName(),
+ OldDoFn.ProcessContext.class.getSimpleName(),
+ OldDoFn.class.getSimpleName(),
+ DoFn.class.getSimpleName()));
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return newContext.pane();
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ throw new UnsupportedOperationException(String.format(
+ "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
+ OldDoFn.class.getSimpleName(),
+ OldDoFn.ProcessContext.class.getSimpleName(),
+ OldDoFn.class.getSimpleName(),
+ DoFn.class.getSimpleName()));
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return newContext.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ newContext.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ newContext.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ newContext.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ newContext.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return null;
+ }
+ }
+
+ private class AdaptedDoFn extends DoFn<InputT, OutputT> {
+
+ @StartBundle
+ public void startBundle(DoFn.Context c) throws Exception {
+ OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
+ }
+
+ @ProcessElement
+ public void processElement(DoFn.ProcessContext c) throws Exception {
+ OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
+ }
+
+ @FinishBundle
+ public void finishBundle(DoFn.Context c) throws Exception {
+ OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return OldDoFn.this.getAllowedTimestampSkew();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ OldDoFn.this.populateDisplayData(builder);
+ }
+
+ @Override
+ protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return OldDoFn.this.getInputTypeDescriptor();
+ }
+
+ @Override
+ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return OldDoFn.this.getOutputTypeDescriptor();
+ }
+ }
}