You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:34 UTC

[16/50] incubator-beam git commit: Add adapter from OldDoFn.RequiresWindowAccess to DoFn

Add adapter from OldDoFn.RequiresWindowAccess to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/164ee56b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/164ee56b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/164ee56b

Branch: refs/heads/apex-runner
Commit: 164ee56b41e01c0ee637eff24e23a814b5885e6f
Parents: a9a41eb
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 19:45:18 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:50 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 89 +++++++++++++++++---
 1 file changed, 76 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/164ee56b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index e3cfc38..72c2965 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -77,19 +77,12 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
 
   public DoFn<InputT, OutputT> toDoFn() {
     if (this instanceof RequiresWindowAccess) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Cannot convert %s to %s because it implements %s."
-                  + " Please convert your %s to a %s directly.",
-              getClass(),
-              DoFn.class.getSimpleName(),
-              RequiresWindowAccess.class.getSimpleName(),
-              OldDoFn.class.getSimpleName(),
-              DoFn.class.getSimpleName()));
-    }
-
-    // No parameters as it just accesses `this`
-    return new AdaptedDoFn();
+      // No parameters as it just accesses `this`
+      return new AdaptedRequiresWindowAccessDoFn();
+    } else {
+      // No parameters as it just accesses `this`
+      return new AdaptedDoFn();
+    }
   }
 
   /**
@@ -770,4 +763,74 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
       return OldDoFn.this.getOutputTypeDescriptor();
     }
   }
+
+  /**
+   * A {@link ProcessContext} for an {@link OldDoFn} that implements
+   * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}.
+   */
+  private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
+
+    private final BoundedWindow window;
+
+    public AdaptedRequiresWindowAccessProcessContext(
+        DoFn<InputT, OutputT>.ProcessContext newContext,
+        BoundedWindow window) {
+      super(newContext);
+      this.window = window;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return window;
+    }
+  }
+
+  private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> {
+
+    @Setup
+    public void setup() throws Exception {
+      OldDoFn.this.setup();
+    }
+
+    @StartBundle
+    public void startBundle(Context c) throws Exception {
+      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      OldDoFn.this.processElement(
+          OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window));
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
+    }
+
+    @Teardown
+    public void teardown() throws Exception {
+      OldDoFn.this.teardown();
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return OldDoFn.this.getAllowedTimestampSkew();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      OldDoFn.this.populateDisplayData(builder);
+    }
+
+    @Override
+    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+      return OldDoFn.this.getInputTypeDescriptor();
+    }
+
+    @Override
+    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+      return OldDoFn.this.getOutputTypeDescriptor();
+    }
+  }
 }