You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:34 UTC
[16/50] incubator-beam git commit: Add adapter from
OldDoFn.RequiresWindowAccess to DoFn
Add adapter from OldDoFn.RequiresWindowAccess to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/164ee56b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/164ee56b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/164ee56b
Branch: refs/heads/apex-runner
Commit: 164ee56b41e01c0ee637eff24e23a814b5885e6f
Parents: a9a41eb
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 19:45:18 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:50 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/OldDoFn.java | 89 +++++++++++++++++---
1 file changed, 76 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/164ee56b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index e3cfc38..72c2965 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -77,19 +77,12 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
public DoFn<InputT, OutputT> toDoFn() {
if (this instanceof RequiresWindowAccess) {
- throw new UnsupportedOperationException(
- String.format(
- "Cannot convert %s to %s because it implements %s."
- + " Please convert your %s to a %s directly.",
- getClass(),
- DoFn.class.getSimpleName(),
- RequiresWindowAccess.class.getSimpleName(),
- OldDoFn.class.getSimpleName(),
- DoFn.class.getSimpleName()));
- }
-
- // No parameters as it just accesses `this`
- return new AdaptedDoFn();
+ // No parameters as it just accesses `this`
+ return new AdaptedRequiresWindowAccessDoFn();
+ } else {
+ // No parameters as it just accesses `this`
+ return new AdaptedDoFn();
+ }
}
/**
@@ -770,4 +763,74 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
return OldDoFn.this.getOutputTypeDescriptor();
}
}
+
+ /**
+ * A {@link ProcessContext} for an {@link OldDoFn} that implements
+ * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}.
+ */
+ private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
+
+ private final BoundedWindow window;
+
+ public AdaptedRequiresWindowAccessProcessContext(
+ DoFn<InputT, OutputT>.ProcessContext newContext,
+ BoundedWindow window) {
+ super(newContext);
+ this.window = window;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return window;
+ }
+ }
+
+ private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> {
+
+ @Setup
+ public void setup() throws Exception {
+ OldDoFn.this.setup();
+ }
+
+ @StartBundle
+ public void startBundle(Context c) throws Exception {
+ OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ OldDoFn.this.processElement(
+ OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window));
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ OldDoFn.this.teardown();
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return OldDoFn.this.getAllowedTimestampSkew();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ OldDoFn.this.populateDisplayData(builder);
+ }
+
+ @Override
+ protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return OldDoFn.this.getInputTypeDescriptor();
+ }
+
+ @Override
+ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return OldDoFn.this.getOutputTypeDescriptor();
+ }
+ }
}