You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/21 02:53:12 UTC

[4/7] incubator-beam git commit: Add OldDoFn.toDoFn() adapter

Add OldDoFn.toDoFn() adapter


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44878e57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44878e57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44878e57

Branch: refs/heads/master
Commit: 44878e57eafc3ee6c437b8537dd1f88024006a51
Parents: fe0b7bf
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 20:44:52 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 18:32:06 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 183 +++++++++++++++++++
 1 file changed, 183 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44878e57/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index a445c7d..912bf24 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -75,6 +75,23 @@ import org.joda.time.Instant;
 @Deprecated
 public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
 
+  public DoFn<InputT, OutputT> toDoFn() {
+    if (this instanceof RequiresWindowAccess) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Cannot convert %s to %s because it implements %s."
+                  + " Please convert your %s to a %s directly.",
+              getClass(),
+              DoFn.class.getSimpleName(),
+              RequiresWindowAccess.class.getSimpleName(),
+              OldDoFn.class.getSimpleName(),
+              DoFn.class.getSimpleName()));
+    }
+
+    // No parameters as it just accesses `this`
+    return new AdaptedDoFn();
+  }
+
   /**
    * Information accessible to all methods in this {@code OldDoFn}.
    * Used primarily to output elements.
@@ -587,4 +604,170 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
       return false;
     }
   }
+
+  /**
+   * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
+   */
+  private class AdaptedContext extends Context {
+
+    private final DoFn<InputT, OutputT>.Context newContext;
+
+    public AdaptedContext(
+        DoFn<InputT, OutputT>.Context newContext) {
+      this.newContext = newContext;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return newContext.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      newContext.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      newContext.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      newContext.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      newContext.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return null;
+    }
+  }
+
+  /**
+   * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
+   */
+  private class AdaptedProcessContext extends ProcessContext {
+
+    private final DoFn<InputT, OutputT>.ProcessContext newContext;
+
+    public AdaptedProcessContext(
+        DoFn<InputT, OutputT>.ProcessContext newContext) {
+      this.newContext = newContext;
+    }
+
+    @Override
+    public InputT element() {
+      return newContext.element();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return newContext.sideInput(view);
+    }
+
+    @Override
+    public Instant timestamp() {
+      return newContext.timestamp();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(String.format(
+          "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
+          OldDoFn.class.getSimpleName(),
+          OldDoFn.ProcessContext.class.getSimpleName(),
+          OldDoFn.class.getSimpleName(),
+          DoFn.class.getSimpleName()));
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return newContext.pane();
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      throw new UnsupportedOperationException(String.format(
+          "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
+          OldDoFn.class.getSimpleName(),
+          OldDoFn.ProcessContext.class.getSimpleName(),
+          OldDoFn.class.getSimpleName(),
+          DoFn.class.getSimpleName()));
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return newContext.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      newContext.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      newContext.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      newContext.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      newContext.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return null;
+    }
+  }
+
+  private class AdaptedDoFn extends DoFn<InputT, OutputT> {
+
+    @StartBundle
+    public void startBundle(DoFn.Context c) throws Exception {
+      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
+    }
+
+    @ProcessElement
+    public void processElement(DoFn.ProcessContext c) throws Exception {
+      OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
+    }
+
+    @FinishBundle
+    public void finishBundle(DoFn.Context c) throws Exception {
+      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return OldDoFn.this.getAllowedTimestampSkew();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      OldDoFn.this.populateDisplayData(builder);
+    }
+
+    @Override
+    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+      return OldDoFn.this.getInputTypeDescriptor();
+    }
+
+    @Override
+    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+      return OldDoFn.this.getOutputTypeDescriptor();
+    }
+  }
 }