You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/15 22:29:01 UTC

[01/10] incubator-beam git commit: Converts all easy OldDoFns to DoFn

Repository: incubator-beam
Updated Branches:
  refs/heads/master 3e1a62815 -> 5a3ace4a7


Converts all easy OldDoFns to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5f329ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5f329ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5f329ee

Branch: refs/heads/master
Commit: f5f329eee4e4a446dafe15b1c42a8f0972360fbc
Parents: 3e1a628
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 16:17:46 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:48:27 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 11 ++----
 .../FlattenPCollectionTranslatorTest.java       | 15 +++-----
 .../translation/GroupByKeyTranslatorTest.java   | 21 ++++-------
 .../translation/ParDoBoundTranslatorTest.java   | 39 ++++++++++----------
 .../translation/ReadUnboundTranslatorTest.java  | 15 +++-----
 .../apache/beam/runners/flink/FlinkRunner.java  | 10 ++---
 .../beam/runners/flink/PipelineOptionsTest.java | 11 +++---
 .../flink/streaming/DoFnOperatorTest.java       | 19 +++++-----
 .../flink/streaming/GroupByNullKeyTest.java     | 18 ++++-----
 .../streaming/TopWikipediaSessionsITCase.java   | 10 ++---
 10 files changed, 75 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 899efa3..e5bde46 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -22,13 +22,11 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
-
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.apex.api.EmbeddedAppLauncher;
 import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;
@@ -45,7 +43,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -245,10 +242,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
   }
 
-  private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
-    @Override
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
-      c.output(Arrays.asList(c.element()));
+      c.output(Collections.singletonList(c.element()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
index 6b62a58..f5abc34 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
@@ -19,12 +19,11 @@
 package org.apache.beam.runners.apex.translation;
 
 import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -32,8 +31,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -83,14 +82,10 @@ public class FlattenPCollectionTranslatorTest {
     Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS));
   }
 
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final ArrayList<Object> RESULTS = new ArrayList<>();
-
-    public EmbeddedCollector() {
-    }
+  private static class EmbeddedCollector extends DoFn<Object, Void> {
+    private static final List<Object> RESULTS = Collections.synchronizedList(new ArrayList<>());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       RESULTS.add(c.element());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index d627cd9..96963a0 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.apex.translation;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -28,9 +27,8 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-
+import java.util.Set;
 import javax.annotation.Nullable;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -42,7 +40,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -106,22 +104,17 @@ public class GroupByKeyTranslatorTest {
 
   }
 
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
-
-    public EmbeddedCollector() {
-    }
+  private static class EmbeddedCollector extends DoFn<Object, Void> {
+    private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       RESULTS.add(c.element());
     }
   }
 
-  private static class KeyedByTimestamp<T> extends OldDoFn<T, KV<Instant, T>> {
-
-    @Override
+  private static class KeyedByTimestamp<T> extends DoFn<T, KV<Instant, T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(KV.of(c.timestamp(), c.element()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
index 2e86152..28b2ec9 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -26,14 +26,13 @@ import com.datatorrent.api.Sink;
 import com.datatorrent.lib.util.KryoCloneUtils;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Pattern;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -49,7 +48,8 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
@@ -113,8 +113,7 @@ public class ParDoBoundTranslatorTest {
     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
-  @SuppressWarnings("serial")
-  private static class Add extends OldDoFn<Integer, Integer> {
+  private static class Add extends DoFn<Integer, Integer> {
     private Integer number;
     private PCollectionView<Integer> sideInputView;
 
@@ -126,7 +125,7 @@ public class ParDoBoundTranslatorTest {
       this.sideInputView = sideInputView;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       if (sideInputView != null) {
         number = c.sideInput(sideInputView);
@@ -135,15 +134,14 @@ public class ParDoBoundTranslatorTest {
     }
   }
 
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    private static final long serialVersionUID = 1L;
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
+  private static class EmbeddedCollector extends DoFn<Object, Void> {
+    private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>());
 
     public EmbeddedCollector() {
       RESULTS.clear();
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       RESULTS.add(c.element());
     }
@@ -207,13 +205,16 @@ public class ParDoBoundTranslatorTest {
     PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1))
             .apply(Sum.integersGlobally().asSingletonView());
 
-    ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options,
-        new Add(singletonView), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
-        WindowingStrategy.globalDefault(),
-        Collections.<PCollectionView<?>>singletonList(singletonView),
-        coder,
-        new ApexStateInternals.ApexStateInternalsFactory<Void>()
-        );
+    ApexParDoOperator<Integer, Integer> operator =
+        new ApexParDoOperator<>(
+            options,
+            DoFnAdapters.toOldDoFn(new Add(singletonView)),
+            new TupleTag<Integer>(),
+            TupleTagList.empty().getAll(),
+            WindowingStrategy.globalDefault(),
+            Collections.<PCollectionView<?>>singletonList(singletonView),
+            coder,
+            new ApexStateInternals.ApexStateInternalsFactory<Void>());
     operator.setup(null);
     operator.beginWindow(0);
     WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
@@ -303,7 +304,7 @@ public class ParDoBoundTranslatorTest {
      Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
-  private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> {
+  private static class TestMultiOutputWithSideInputsFn extends DoFn<Integer, String> {
     private static final long serialVersionUID = 1L;
 
     final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
@@ -315,7 +316,7 @@ public class ParDoBoundTranslatorTest {
       this.sideOutputTupleTags.addAll(sideOutputTupleTags);
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       outputToAllWithSideInputs(c, "processing: " + c.element());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
index 96ba663..8e44bab 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
@@ -24,11 +24,10 @@ import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
-
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -39,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.junit.Assert;
 import org.junit.Test;
@@ -113,14 +112,10 @@ public class ReadUnboundTranslatorTest {
     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
-
-    public EmbeddedCollector() {
-    }
+  private static class EmbeddedCollector extends DoFn<Object, Void> {
+    private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       RESULTS.add(c.element());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 7c1284b..5f92378 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -24,7 +24,7 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -440,10 +440,10 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
     }
   }
 
-  private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
-    @Override
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
-      c.output(Arrays.asList(c.element()));
+      c.output(Collections.singletonList(c.element()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 3c30fed..e44a705 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -29,7 +29,8 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -98,7 +99,7 @@ public class PipelineOptionsTest {
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
     DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
-        new TestDoFn(),
+        DoFnAdapters.toOldDoFn(new TestDoFn()),
         TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
         new TupleTag<>("main-output"),
         Collections.<TupleTag<?>>emptyList(),
@@ -117,7 +118,7 @@ public class PipelineOptionsTest {
   public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
 
     DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
-        new TestDoFn(),
+        DoFnAdapters.toOldDoFn(new TestDoFn()),
         TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
         new TupleTag<>("main-output"),
         Collections.<TupleTag<?>>emptyList(),
@@ -151,9 +152,9 @@ public class PipelineOptionsTest {
   }
 
 
-  private static class TestDoFn extends OldDoFn<Object, Object> {
+  private static class TestDoFn extends DoFn<Object, Object> {
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Assert.assertNotNull(c.getPipelineOptions());
       Assert.assertEquals(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 913fb8b..65e244a 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -25,7 +25,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-
 import java.util.Collections;
 import java.util.HashMap;
 import javax.annotation.Nullable;
@@ -35,6 +34,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -96,7 +97,7 @@ public class DoFnOperatorTest {
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        new IdentityDoFn<String>(),
+        DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
         coderTypeInfo,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
@@ -140,7 +141,7 @@ public class DoFnOperatorTest {
         .build();
 
     DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
-        new MultiOutputDoFn(sideOutput1, sideOutput2),
+        DoFnAdapters.toOldDoFn(new MultiOutputDoFn(sideOutput1, sideOutput2)),
         coderTypeInfo,
         mainOutput,
         ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2),
@@ -200,7 +201,7 @@ public class DoFnOperatorTest {
             .build();
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        new IdentityDoFn<String>(),
+        DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
         coderTypeInfo,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
@@ -280,7 +281,7 @@ public class DoFnOperatorTest {
     });
   }
 
-  private static class MultiOutputDoFn extends OldDoFn<String, String> {
+  private static class MultiOutputDoFn extends DoFn<String, String> {
     private TupleTag<String> sideOutput1;
     private TupleTag<String> sideOutput2;
 
@@ -289,7 +290,7 @@ public class DoFnOperatorTest {
       this.sideOutput2 = sideOutput2;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       if (c.element().equals("one")) {
         c.sideOutput(sideOutput1, "side: one");
@@ -303,9 +304,9 @@ public class DoFnOperatorTest {
     }
   }
 
-  private static class IdentityDoFn<T> extends OldDoFn<T, T> {
-    @Override
-    public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
+  private static class IdentityDoFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
       c.output(c.element());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index c6381ee..663b910 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.runners.flink.FlinkTestPipeline;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -64,10 +64,8 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
   /**
    * DoFn extracting user and timestamp.
    */
-  public static class ExtractUserAndTimestamp extends OldDoFn<KV<Integer, String>, String> {
-    private static final long serialVersionUID = 0;
-
-    @Override
+  private static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       KV<Integer, String> record = c.element();
       int timestamp = record.getKey();
@@ -100,16 +98,16 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
               .withAllowedLateness(Duration.ZERO)
               .discardingFiredPanes())
 
-          .apply(ParDo.of(new OldDoFn<String, KV<Void, String>>() {
-            @Override
+          .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               String elem = c.element();
-              c.output(KV.<Void, String>of((Void) null, elem));
+              c.output(KV.<Void, String>of(null, elem));
             }
           }))
           .apply(GroupByKey.<Void, String>create())
-          .apply(ParDo.of(new OldDoFn<KV<Void, Iterable<String>>, String>() {
-            @Override
+          .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               KV<Void, Iterable<String>> elem = c.element();
               StringBuilder str = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5f329ee/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 9410481..9e6bba8 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -100,8 +100,8 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
 
 
 
-      .apply(ParDo.of(new OldDoFn<TableRow, String>() {
-        @Override
+      .apply(ParDo.of(new DoFn<TableRow, String>() {
+        @ProcessElement
         public void processElement(ProcessContext c) throws Exception {
           TableRow row = c.element();
           long timestamp = (Integer) row.get("timestamp");
@@ -117,8 +117,8 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
 
       .apply(Count.<String>perElement());
 
-    PCollection<String> format = output.apply(ParDo.of(new OldDoFn<KV<String, Long>, String>() {
-      @Override
+    PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         KV<String, Long> el = c.element();
         String out = "user: " + el.getKey() + " value:" + el.getValue();


[08/10] incubator-beam git commit: Moves DoFnAdapters to runners-core

Posted by ke...@apache.org.
Moves DoFnAdapters to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33ed3238
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33ed3238
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33ed3238

Branch: refs/heads/master
Commit: 33ed3238e2b3899cff061be3056c5cc29fc60a04
Parents: ca1dd7a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:28:16 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:59:11 2016 -0800

----------------------------------------------------------------------
 .../apex/translation/WindowBoundTranslator.java |   2 +-
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../operators/ApexParDoOperator.java            |   2 +-
 .../apache/beam/runners/core/DoFnAdapters.java  | 344 +++++++++++++++++++
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   4 +-
 .../core/GroupAlsoByWindowsProperties.java      |   2 +-
 .../functions/FlinkDoFnFunction.java            |   2 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |   2 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../sdk/transforms/AggregatorRetriever.java     |  13 +-
 .../beam/sdk/transforms/DoFnAdapters.java       | 340 ------------------
 .../org/apache/beam/sdk/transforms/OldDoFn.java |   2 +-
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java |   2 +-
 14 files changed, 367 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
index 33b9269..ef049e1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -22,8 +22,8 @@ import java.util.Collections;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 48ac177..4af7ff0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -413,7 +413,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 08f062d..1e76949 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.NullSideInputReader;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
new file mode 100644
index 0000000..0f5624f
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AggregatorRetriever;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
+ *
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
+ *     DoFnInvoker}) rather than via {@link OldDoFn}.
+ */
+@Deprecated
+public class DoFnAdapters {
+  /** Should not be instantiated. */
+  private DoFnAdapters() {}
+
+  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
+    DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
+    if (signature.processElement().observesWindow()) {
+      return new WindowDoFnAdapter<>(fn);
+    } else {
+      return new SimpleDoFnAdapter<>(fn);
+    }
+  }
+
+  /**
+   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
+   * OldDoFn}.
+   */
+  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+    private final DoFn<InputT, OutputT> fn;
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(AggregatorRetriever.getDelegatingAggregators(fn));
+      this.fn = fn;
+      this.invoker = DoFnInvokers.invokerFor(fn);
+    }
+
+    @Override
+    public void setup() throws Exception {
+      this.invoker.invokeSetup();
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      fn.prepareForProcessing();
+      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void teardown() throws Exception {
+      this.invoker.invokeTeardown();
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
+      invoker.invokeProcessElement(adapter);
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return fn.getAllowedTimestampSkew();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(fn);
+    }
+
+    private void readObject(java.io.ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      this.invoker = DoFnInvokers.invokerFor(fn);
+    }
+  }
+
+  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
+  private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
+      implements OldDoFn.RequiresWindowAccess {
+
+    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(fn);
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
+   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
+   * unavailable.
+   */
+  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.Context context;
+
+    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+      fn.super();
+      this.context = context;
+      super.setupDelegateAggregators();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name,
+        CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
+    public BoundedWindow window() {
+      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
+      // should be unreachable.
+      throw new UnsupportedOperationException(
+          "Can only get the window in processElement; elsewhere there is no defined window.");
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Can only get a ProcessContext in processElement");
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Timers are not supported for OldDoFn");
+    }
+
+    @Override
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by this runner");
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
+   */
+  private static class ProcessContextAdapter<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.ProcessContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.ProcessContext context;
+
+    private ProcessContextAdapter(
+        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
+      fn.super();
+      this.context = context;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return context.sideInput(view);
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
+    public InputT element() {
+      return context.element();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return context.timestamp();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return context.pane();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return context.window();
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
+    }
+
+    @Override
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by this runner");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 73286ad..10af29a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -323,7 +323,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
       return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
@@ -505,7 +505,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
     }
 
     @Override
-    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
+    public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
         createAggregatorInternal(
             String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
       return context.createAggregatorInternal(name, combiner);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 97b67c6..ef01106 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -744,7 +744,7 @@ public class GroupAlsoByWindowsProperties {
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index ed200d5..2a4a68e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -18,10 +18,10 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 7f6a436..a97bd46 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -18,10 +18,10 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 6afca38..53b9803 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -252,7 +252,7 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
   public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
 
   @Override
-  protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+  public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
   createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
     @SuppressWarnings("unchecked")
     SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 6729aaa..87b15a7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
@@ -40,7 +41,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
index ce47e22..b1d3ead 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
@@ -18,9 +18,10 @@
 package org.apache.beam.sdk.transforms;
 
 import java.util.Collection;
+import java.util.Map;
 
 /**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}.
+ * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}.
  */
 public final class AggregatorRetriever {
   private AggregatorRetriever() {
@@ -28,9 +29,17 @@ public final class AggregatorRetriever {
   }
 
   /**
-   * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}.
+   * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}.
    */
   public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
     return fn.getAggregators();
   }
+
+  /**
+   * Returns the {@link DelegatingAggregator delegating aggregators} created by the provided {@link
+   * DoFn}.
+   */
+  public static Map<String, DelegatingAggregator<?, ?>> getDelegatingAggregators(DoFn<?, ?> fn) {
+    return fn.aggregators;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
deleted file mode 100644
index 0a71faa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import java.io.IOException;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
- *
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
- *     DoFnInvoker}) rather than via {@link OldDoFn}.
- */
-@Deprecated
-public class DoFnAdapters {
-  /** Should not be instantiated. */
-  private DoFnAdapters() {}
-
-  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
-    DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
-    if (signature.processElement().observesWindow()) {
-      return new WindowDoFnAdapter<>(fn);
-    } else {
-      return new SimpleDoFnAdapter<>(fn);
-    }
-  }
-
-  /**
-   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
-   * OldDoFn}.
-   */
-  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-    private final DoFn<InputT, OutputT> fn;
-    private transient DoFnInvoker<InputT, OutputT> invoker;
-
-    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(fn.aggregators);
-      this.fn = fn;
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-
-    @Override
-    public void setup() throws Exception {
-      this.invoker.invokeSetup();
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      fn.prepareForProcessing();
-      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void teardown() throws Exception {
-      this.invoker.invokeTeardown();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
-      invoker.invokeProcessElement(adapter);
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return fn.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(fn);
-    }
-
-    private void readObject(java.io.ObjectInputStream in)
-        throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-  }
-
-  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
-  private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
-      implements OldDoFn.RequiresWindowAccess {
-
-    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(fn);
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
-   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
-   * unavailable.
-   */
-  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.Context context;
-
-    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
-      fn.super();
-      this.context = context;
-      super.setupDelegateAggregators();
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name,
-        CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get the window in processElement; elsewhere there is no defined window.");
-    }
-
-    @Override
-    public Context context(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Can only get a ProcessContext in processElement");
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
-    }
-
-    @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
-   */
-  private static class ProcessContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.ProcessContext context;
-
-    private ProcessContextAdapter(
-        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return context.sideInput(view);
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
-    public InputT element() {
-      return context.element();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return context.timestamp();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return context.pane();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return context.window();
-    }
-
-    @Override
-    public Context context(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
-    }
-
-    @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by this runner");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 0aef552..7b04533 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -192,7 +192,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
      *         context
      */
     @Experimental(Kind.AGGREGATOR)
-    protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+    public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
         createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33ed3238/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
index 504480b..0db130d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
@@ -63,7 +63,7 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
         Instant timestamp) {
     }
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
         createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       return null;
     }



[06/10] incubator-beam git commit: Removes unused code from NoOpOldDoFn

Posted by ke...@apache.org.
Removes unused code from NoOpOldDoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca1dd7a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca1dd7a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca1dd7a3

Branch: refs/heads/master
Commit: ca1dd7a3a4a01e2696a8131809c5798aef55d6a0
Parents: f3e8a03
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:24:23 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java | 72 --------------------
 1 file changed, 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca1dd7a3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
index 302b66a..504480b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
@@ -19,10 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
@@ -46,13 +42,6 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
   }
 
   /**
-   * Returns a new NoOp Process Context.
-   */
-  public OldDoFn<InputT, OutputT>.ProcessContext processContext() {
-    return new NoOpDoFnProcessContext();
-  }
-
-  /**
    * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
    */
   private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
@@ -79,65 +68,4 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
       return null;
     }
   }
-
-  /**
-   * A {@link OldDoFn.ProcessContext} that does nothing and returns exclusively
-   * null.
-   */
-  private class NoOpDoFnProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext {
-    @Override
-    public InputT element() {
-      return null;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return null;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return null;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return null;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return null;
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return null;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return null;
-    }
-
-    @Override
-    public void output(OutputT output) {}
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {}
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {}
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output,
-        Instant timestamp) {}
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return null;
-    }
-
-  }
 }


[05/10] incubator-beam git commit: Removes OldDoFn from ParDo

Posted by ke...@apache.org.
Removes OldDoFn from ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9e53c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9e53c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9e53c5d

Branch: refs/heads/master
Commit: e9e53c5d037561aa4dcacfcde69d76a03f3a1571
Parents: 8330bfa
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:13:43 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/ParDo.java   | 167 +++----------------
 .../apache/beam/sdk/transforms/OldDoFnTest.java | 125 ++++----------
 2 files changed, 55 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9e53c5d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 167f5fa..d2149c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -530,23 +529,6 @@ public class ParDo {
     return new Unbound().of(fn, displayDataForFn(fn));
   }
 
-  /**
-   * Creates a {@link ParDo} {@link PTransform} that will invoke the
-   * given {@link OldDoFn} function.
-   *
-   * <p>The resulting {@link PTransform PTransform's} types have been bound, with the
-   * input being a {@code PCollection<InputT>} and the output a
-   * {@code PCollection<OutputT>}, inferred from the types of the argument
-   * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further
-   * properties can be set on it first.
-   *
-   * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
-   */
-  @Deprecated
-  public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
-    return new Unbound().of(fn, displayDataForFn(fn));
-  }
-
   private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
     return DisplayData.item("fn", fn.getClass()).withLabel("Transform Function");
   }
@@ -557,12 +539,7 @@ public class ParDo {
    * the {@link PCollection}.
    */
   private static <InputT, OutputT> void validateWindowType(
-      PCollection<? extends InputT> input, Serializable fn) {
-    // No validation for OldDoFn
-    if (!(fn instanceof DoFn)) {
-      return;
-    }
-
+      PCollection<? extends InputT> input, DoFn<InputT, OutputT> fn) {
     DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
 
     TypeDescriptor<? extends BoundedWindow> actualWindowT =
@@ -609,10 +586,6 @@ public class ParDo {
     }
   }
 
-  private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFn<InputT, OutputT> fn) {
-    return DoFnAdapters.toOldDoFn(fn);
-  }
-
   /**
    * An incomplete {@link ParDo} transform, with unbound input/output types.
    *
@@ -688,24 +661,9 @@ public class ParDo {
       return new UnboundMulti<>(name, sideInputs, mainOutputTag, sideOutputTags);
     }
 
-    /**
-     * Returns a new {@link ParDo} {@link PTransform} that's like this
-     * transform but that will invoke the given {@link OldDoFn}
-     * function, and that has its input and output types bound. Does
-     * not modify this transform. The resulting {@link PTransform} is
-     * sufficiently specified to be applied, but more properties can
-     * still be specified.
-     *
-     * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
-     */
-    @Deprecated
-    public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> oldFn) {
-      return of(oldFn, displayDataForFn(oldFn));
-    }
-
     private <InputT, OutputT> Bound<InputT, OutputT> of(
-        Serializable originalFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
-      return new Bound<>(name, originalFn, sideInputs, fnDisplayData);
+        DoFn<InputT, OutputT> doFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+      return new Bound<>(name, doFn, sideInputs, fnDisplayData);
     }
   }
 
@@ -725,12 +683,12 @@ public class ParDo {
       extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
     // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
-    private final Serializable fn;
+    private final DoFn<InputT, OutputT> fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
 
     Bound(
         String name,
-        Serializable fn,
+        DoFn<InputT, OutputT> fn,
         List<PCollectionView<?>> sideInputs,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       super(name);
@@ -787,7 +745,7 @@ public class ParDo {
     @Override
     public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(getOldFn()),
+          !isSplittable(getNewFn()),
           "%s does not support Splittable DoFn",
           input.getPipeline().getOptions().getRunner().getName());
       validateWindowType(input, fn);
@@ -795,7 +753,7 @@ public class ParDo {
               input.getPipeline(),
               input.getWindowingStrategy(),
               input.isBounded())
-          .setTypeDescriptor(getOldFn().getOutputTypeDescriptor());
+          .setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
     }
 
     @Override
@@ -803,14 +761,14 @@ public class ParDo {
     protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
         throws CannotProvideCoderException {
       return input.getPipeline().getCoderRegistry().getDefaultCoder(
-          getOldFn().getOutputTypeDescriptor(),
-          getOldFn().getInputTypeDescriptor(),
+          getNewFn().getOutputTypeDescriptor(),
+          getNewFn().getInputTypeDescriptor(),
           ((PCollection<InputT>) input).getCoder());
     }
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn());
+      Class<?> clazz = getNewFn().getClass();
       if (clazz.isAnonymousClass()) {
         return "AnonymousParDo";
       } else {
@@ -831,44 +789,7 @@ public class ParDo {
       ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
     }
 
-    /**
-     * @deprecated this method to be converted to return {@link DoFn}. If you want to receive
-     * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}.
-     */
-    @Deprecated
-    public OldDoFn<InputT, OutputT> getFn() {
-      return getOldFn();
-    }
-
-    /**
-     * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is migrated to return
-     * a {@link DoFn}.
-     */
-    @Deprecated
-    public OldDoFn<InputT, OutputT> getOldFn() {
-      if (fn instanceof OldDoFn) {
-        return (OldDoFn<InputT, OutputT>) fn;
-      } else {
-        return adapt((DoFn<InputT, OutputT>) fn);
-      }
-    }
-
     public DoFn<InputT, OutputT> getNewFn() {
-      if (fn instanceof DoFn) {
-        return (DoFn<InputT, OutputT>) fn;
-      } else {
-        return ((OldDoFn<InputT, OutputT>) fn).toDoFn();
-      }
-    }
-
-    /**
-     * Returns the {@link OldDoFn} or {@link DoFn} used to create this transform.
-     *
-     * @deprecated for migration purposes only. There are some cases of {@link OldDoFn} that are not
-     *     fully supported by wrapping it into a {@link DoFn}, such as {@link RequiresWindowAccess}.
-     */
-    @Deprecated
-    public Object getOriginalFn() {
       return fn;
     }
 
@@ -951,23 +872,8 @@ public class ParDo {
       return of(fn, displayDataForFn(fn));
     }
 
-    /**
-     * Returns a new multi-output {@link ParDo} {@link PTransform}
-     * that's like this transform but that will invoke the given
-     * {@link OldDoFn} function, and that has its input type bound.
-     * Does not modify this transform. The resulting
-     * {@link PTransform} is sufficiently specified to be applied, but
-     * more properties can still be specified.
-     *
-     * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
-     */
-    @Deprecated
-    public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
-      return of(fn, displayDataForFn(fn));
-    }
-
     private <InputT> BoundMulti<InputT, OutputT> of(
-        Serializable fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+        DoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       return new BoundMulti<>(name, fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData);
     }
   }
@@ -990,11 +896,11 @@ public class ParDo {
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
-    private final Serializable fn;
+    private final DoFn<InputT, OutputT> fn;
 
     BoundMulti(
         String name,
-        Serializable fn,
+        DoFn<InputT, OutputT> fn,
         List<PCollectionView<?>> sideInputs,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList sideOutputTags,
@@ -1046,7 +952,7 @@ public class ParDo {
     @Override
     public PCollectionTuple expand(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(getOldFn()),
+          !isSplittable(getNewFn()),
           "%s does not support Splittable DoFn",
           input.getPipeline().getOptions().getRunner().getName());
       validateWindowType(input, fn);
@@ -1059,7 +965,7 @@ public class ParDo {
       // The fn will likely be an instance of an anonymous subclass
       // such as DoFn<Integer, String> { }, thus will have a high-fidelity
       // TypeDescriptor for the output type.
-      outputs.get(mainOutputTag).setTypeDescriptor(getOldFn().getOutputTypeDescriptor());
+      outputs.get(mainOutputTag).setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
 
       return outputs;
     }
@@ -1084,7 +990,7 @@ public class ParDo {
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn());
+      Class<?> clazz = getNewFn().getClass();
       if (clazz.isAnonymousClass()) {
         return "AnonymousParMultiDo";
       } else {
@@ -1095,37 +1001,11 @@ public class ParDo {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
-    }
-
-    /**
-     * @deprecated this method to be converted to return {@link DoFn}. If you want to receive
-     * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}.
-     */
-    @Deprecated
-    public OldDoFn<InputT, OutputT> getFn() {
-      return getOldFn();
-    }
-
-    /**
-     * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is migrated to return
-     * a {@link DoFn}.
-     */
-    @Deprecated
-    public OldDoFn<InputT, OutputT> getOldFn() {
-      if (fn instanceof OldDoFn) {
-        return (OldDoFn<InputT, OutputT>) fn;
-      } else {
-        return adapt((DoFn<InputT, OutputT>) fn);
-      }
+      ParDo.populateDisplayData(builder, fn, fnDisplayData);
     }
 
     public DoFn<InputT, OutputT> getNewFn() {
-      if (fn instanceof DoFn) {
-        return (DoFn<InputT, OutputT>) fn;
-      } else {
-        return ((OldDoFn<InputT, OutputT>) fn).toDoFn();
-      }
+      return fn;
     }
 
     public TupleTag<OutputT> getMainOutputTag() {
@@ -1148,14 +1028,7 @@ public class ParDo {
     builder.include("fn", fn).add(fnDisplayData);
   }
 
-  private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {
-    DoFn<?, ?> fn = DoFnAdapters.getDoFn(oldDoFn);
-    if (fn == null) {
-      return false;
-    }
-    return DoFnSignatures
-        .getSignature(fn.getClass())
-        .processElement()
-        .isSplittable();
+  private static boolean isSplittable(DoFn<?, ?> fn) {
+    return DoFnSignatures.signatureForDoFn(fn).processElement().isSplittable();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9e53c5d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index 07e3078..cc84252 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -18,28 +18,20 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import java.util.Map;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -134,68 +126,52 @@ public class OldDoFnTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInStartBundleThrows() {
-    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+  public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
+    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
       @Override
-      public void startBundle(OldDoFn<String, String>.Context c) throws Exception {
-        createAggregator("anyAggregate", new MaxIntegerFn());
-      }
-
-      @Override
-      public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {}
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
+      public void processElement(ProcessContext c) throws Exception { }
+    };
+    OldDoFn<String, String>.Context context = createContext(fn);
+    context.setupDelegateAggregators();
 
-    p.run();
+    thrown.expect(isA(IllegalStateException.class));
+    fn.createAggregator("anyAggregate", new MaxIntegerFn());
   }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInProcessElementThrows() {
-    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+  private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
+    return fn.new Context() {
       @Override
-      public void processElement(ProcessContext c) throws Exception {
-        createAggregator("anyAggregate", new MaxIntegerFn());
+      public PipelineOptions getPipelineOptions() {
+        throw new UnsupportedOperationException();
       }
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-
-    p.run();
-  }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInFinishBundleThrows() {
-    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
       @Override
-      public void finishBundle(OldDoFn<String, String>.Context c) throws Exception {
-        createAggregator("anyAggregate", new MaxIntegerFn());
+      public void output(String output) {
+        throw new UnsupportedOperationException();
       }
 
       @Override
-      public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {}
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
+      public void outputWithTimestamp(String output, Instant timestamp) {
+        throw new UnsupportedOperationException();
+      }
 
-    p.run();
-  }
+      @Override
+      public <T> void sideOutput(TupleTag<T> tag, T output) {
+        throw new UnsupportedOperationException();
+      }
 
-  /**
-   * Initialize a test pipeline with the specified {@link OldDoFn}.
-   */
-  private <InputT, OutputT> TestPipeline createTestPipeline(OldDoFn<InputT, OutputT> fn) {
-    TestPipeline pipeline = TestPipeline.create();
-    pipeline.apply(Create.of((InputT) null))
-     .apply(ParDo.of(fn));
+      @Override
+      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+        throw new UnsupportedOperationException();
+      }
 
-    return pipeline;
+      @Override
+      public <AggInputT, AggOutputT>
+      Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+              String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+        throw new UnsupportedOperationException();
+      }
+    };
   }
 
   @Test
@@ -209,35 +185,4 @@ public class OldDoFnTest implements Serializable {
     DisplayData data = DisplayData.from(usesDefault);
     assertThat(data.items(), empty());
   }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testAggregators() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    CountOddsFn countOdds = new CountOddsFn();
-    PCollection<Void> output = pipeline
-        .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
-        .apply(ParDo.of(countOdds));
-    PipelineResult result = pipeline.run();
-
-    AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
-
-    Map<String, Integer> valuesMap = values.getValuesAtSteps();
-
-    assertThat(valuesMap.size(), equalTo(1));
-    assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), equalTo(4));
-  }
-
-  private static class CountOddsFn extends OldDoFn<Integer, Void> {
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      if (c.element() % 2 == 1) {
-        aggregator.addValue(1);
-      }
-    }
-
-    Aggregator<Integer, Integer> aggregator =
-        createAggregator("odds", new SumIntegerFn());
-  }
 }


[02/10] incubator-beam git commit: Pushes uses of OldDoFn deeper inside Flink runner

Posted by ke...@apache.org.
Pushes uses of OldDoFn deeper inside Flink runner

In particular, various DoFnOperator's now take a regular DoFn
rather than an OldDoFn, and convert it to an OldDoFn internally.

This allows to remove uses of ParDo.getFn() returning OldDoFn.

The only case where the OldDoFn inside a DoFnOperator is actually an
OldDoFn rather than DoFn in disguise is now WindowDoFnOperator, which
overrides getDoFn to return an actual GABW OldDoFn.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8330bfa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8330bfa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8330bfa7

Branch: refs/heads/master
Commit: 8330bfa74cd72e51a29649745e87a4f1a6e5ffa1
Parents: af616d9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 16:47:01 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:55:24 2016 -0800

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java         |  9 +---
 .../FlinkStreamingTransformTranslators.java     |  8 ++--
 .../functions/FlinkDoFnFunction.java            | 10 +++--
 .../functions/FlinkMultiOutputDoFnFunction.java | 10 +++--
 .../wrappers/streaming/DoFnOperator.java        | 43 ++++++++++++++++----
 .../wrappers/streaming/WindowDoFnOperator.java  |  8 ++--
 .../beam/runners/flink/PipelineOptionsTest.java |  5 +--
 .../flink/streaming/DoFnOperatorTest.java       |  8 ++--
 8 files changed, 63 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 9ac907f..497b293 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -523,8 +522,6 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
-
       TypeInformation<WindowedValue<OutputT>> typeInformation =
           context.getTypeInfo(context.getOutput(transform));
 
@@ -539,7 +536,7 @@ class FlinkBatchTransformTranslators {
 
       FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkDoFnFunction<>(
-              oldDoFn,
+              doFn,
               context.getOutput(transform).getWindowingStrategy(),
               sideInputStrategies,
               context.getPipelineOptions());
@@ -570,8 +567,6 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
-
       Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
 
       Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
@@ -618,7 +613,7 @@ class FlinkBatchTransformTranslators {
       @SuppressWarnings("unchecked")
       FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkMultiOutputDoFnFunction(
-              oldDoFn,
+              doFn,
               windowingStrategy,
               sideInputStrategies,
               context.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 042f8df..42ef630 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -358,7 +358,7 @@ public class FlinkStreamingTransformTranslators {
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
-                transform.getFn(),
+                transform.getNewFn(),
                 inputTypeInfo,
                 new TupleTag<OutputT>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
@@ -381,7 +381,7 @@ public class FlinkStreamingTransformTranslators {
 
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
-                transform.getFn(),
+                transform.getNewFn(),
                 inputTypeInfo,
                 new TupleTag<OutputT>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
@@ -515,7 +515,7 @@ public class FlinkStreamingTransformTranslators {
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
-                transform.getFn(),
+                transform.getNewFn(),
                 inputTypeInfo,
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),
@@ -542,7 +542,7 @@ public class FlinkStreamingTransformTranslators {
 
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
-                transform.getFn(),
+                transform.getNewFn(),
                 inputTypeInfo,
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index db045f5..ed200d5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -20,7 +20,10 @@ package org.apache.beam.runners.flink.translation.functions;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -46,16 +49,17 @@ public class FlinkDoFnFunction<InputT, OutputT>
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   public FlinkDoFnFunction(
-      OldDoFn<InputT, OutputT> doFn,
+      DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options) {
-    this.doFn = doFn;
+    this.doFn = DoFnAdapters.toOldDoFn(doFn);
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
 
-    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
+    this.requiresWindowAccess =
+        DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
     this.hasSideInputs = !sideInputs.isEmpty();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 7be4bb4..7f6a436 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -20,8 +20,11 @@ package org.apache.beam.runners.flink.translation.functions;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -54,16 +57,17 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   public FlinkMultiOutputDoFnFunction(
-      OldDoFn<InputT, OutputT> doFn,
+      DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options,
       Map<TupleTag<?>, Integer> outputMap) {
-    this.doFn = doFn;
+    this.doFn = DoFnAdapters.toOldDoFn(doFn);
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.outputMap = outputMap;
 
-    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
+    this.requiresWindowAccess =
+        DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
     this.hasSideInputs = !sideInputs.isEmpty();
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a29664b..6729aaa 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -88,7 +89,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
       TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
 
-  protected OldDoFn<InputT, FnOutputT> doFn;
+  protected OldDoFn<InputT, FnOutputT> oldDoFn;
+
   protected final SerializedPipelineOptions serializedOptions;
 
   protected final TupleTag<FnOutputT> mainOutputTag;
@@ -117,8 +119,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState;
 
+  @Deprecated
   public DoFnOperator(
-      OldDoFn<InputT, FnOutputT> doFn,
+      OldDoFn<InputT, FnOutputT> oldDoFn,
       TypeInformation<WindowedValue<InputT>> inputType,
       TupleTag<FnOutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
@@ -127,7 +130,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,
       PipelineOptions options) {
-    this.doFn = doFn;
+    this.oldDoFn = oldDoFn;
     this.mainOutputTag = mainOutputTag;
     this.sideOutputTags = sideOutputTags;
     this.sideInputTagMapping = sideInputTagMapping;
@@ -148,21 +151,43 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     setChainingStrategy(ChainingStrategy.ALWAYS);
   }
 
+  public DoFnOperator(
+      DoFn<InputT, FnOutputT> doFn,
+      TypeInformation<WindowedValue<InputT>> inputType,
+      TupleTag<FnOutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      OutputManagerFactory<OutputT> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
+      PipelineOptions options) {
+    this(
+        DoFnAdapters.toOldDoFn(doFn),
+        inputType,
+        mainOutputTag,
+        sideOutputTags,
+        outputManagerFactory,
+        windowingStrategy,
+        sideInputTagMapping,
+        sideInputs,
+        options);
+  }
+
   protected ExecutionContext.StepContext createStepContext() {
     return new StepContext();
   }
 
   // allow overriding this in WindowDoFnOperator because this one dynamically creates
   // the DoFn
-  protected OldDoFn<InputT, FnOutputT> getDoFn() {
-    return doFn;
+  protected OldDoFn<InputT, FnOutputT> getOldDoFn() {
+    return oldDoFn;
   }
 
   @Override
   public void open() throws Exception {
     super.open();
 
-    this.doFn = getDoFn();
+    this.oldDoFn = getOldDoFn();
 
     currentInputWatermark = Long.MIN_VALUE;
     currentOutputWatermark = currentInputWatermark;
@@ -220,7 +245,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault(
         serializedOptions.getPipelineOptions(),
-        doFn,
+        oldDoFn,
         sideInputReader,
         outputManagerFactory.create(output),
         mainOutputTag,
@@ -232,13 +257,13 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     pushbackDoFnRunner =
         PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
 
-    doFn.setup();
+    oldDoFn.setup();
   }
 
   @Override
   public void close() throws Exception {
     super.close();
-    doFn.teardown();
+    oldDoFn.teardown();
   }
 
   protected final long getPushbackWatermarkHold() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index f2d7f1c..9cea529 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -106,7 +106,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       PipelineOptions options,
       Coder<K> keyCoder) {
     super(
-        null,
+        (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) null,
         inputType,
         mainOutputTag,
         sideOutputTags,
@@ -124,7 +124,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   }
 
   @Override
-  protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
+  protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getOldDoFn() {
     StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
       @Override
       public StateInternals<K> stateInternalsForKey(K key) {
@@ -138,10 +138,10 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     // has the window type as generic parameter while WindowingStrategy is almost always
     // untyped.
     @SuppressWarnings("unchecked")
-    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
+    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> oldDoFn =
         GroupAlsoByWindowViaWindowSetDoFn.create(
             windowingStrategy, stateInternalsFactory, (SystemReduceFn) systemReduceFn);
-    return doFn;
+    return oldDoFn;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index e44a705..4c97cc7 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -99,7 +98,7 @@ public class PipelineOptionsTest {
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
     DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new TestDoFn()),
+        new TestDoFn(),
         TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
         new TupleTag<>("main-output"),
         Collections.<TupleTag<?>>emptyList(),
@@ -118,7 +117,7 @@ public class PipelineOptionsTest {
   public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
 
     DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new TestDoFn()),
+        new TestDoFn(),
         TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
         new TupleTag<>("main-output"),
         Collections.<TupleTag<?>>emptyList(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 65e244a..113802d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -35,8 +35,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PCollectionViewTesting;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -97,7 +95,7 @@ public class DoFnOperatorTest {
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
+        new IdentityDoFn<String>(),
         coderTypeInfo,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
@@ -141,7 +139,7 @@ public class DoFnOperatorTest {
         .build();
 
     DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new MultiOutputDoFn(sideOutput1, sideOutput2)),
+        new MultiOutputDoFn(sideOutput1, sideOutput2),
         coderTypeInfo,
         mainOutput,
         ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2),
@@ -201,7 +199,7 @@ public class DoFnOperatorTest {
             .build();
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
+        new IdentityDoFn<String>(),
         coderTypeInfo,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),


[04/10] incubator-beam git commit: Removes code for wrapping DoFn as an OldDoFn

Posted by ke...@apache.org.
Removes code for wrapping DoFn as an OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a22de150
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a22de150
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a22de150

Branch: refs/heads/master
Commit: a22de15012c51e8b7e31143021f0a298e093bf51
Parents: e9e53c5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:21:40 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/transforms/DoFnAdapters.java       | 150 ----------
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 295 +------------------
 .../sdk/transforms/reflect/DoFnInvokers.java    | 141 +--------
 .../transforms/reflect/DoFnInvokersTest.java    |  36 ---
 4 files changed, 11 insertions(+), 611 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index e15b08b..d1c40a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import java.io.IOException;
-import java.util.Collection;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -38,7 +36,6 @@ import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -53,18 +50,6 @@ public class DoFnAdapters {
   /** Should not be instantiated. */
   private DoFnAdapters() {}
 
-  /**
-   * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the
-   * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
-   */
-  public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
-    } else {
-      return fn.getClass();
-    }
-  }
-
   /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
   @SuppressWarnings({"unchecked", "rawtypes"})
   public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
@@ -76,126 +61,6 @@ public class DoFnAdapters {
     }
   }
 
-  /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
-      OldDoFn<InputT, OutputT> fn,
-      final DoFn<InputT, OutputT>.ProcessContext c,
-      final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
-    return fn.new ProcessContext() {
-      @Override
-      public InputT element() {
-        return c.element();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-
-      @Override
-      public Instant timestamp() {
-        return c.timestamp();
-      }
-
-      @Override
-      public BoundedWindow window() {
-        return extra.window();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return c.pane();
-      }
-
-      @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        return extra.windowingInternals();
-      }
-
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public void output(OutputT output) {
-        c.output(output);
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        c.outputWithTimestamp(output, timestamp);
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        c.sideOutput(tag, output);
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-        c.sideOutputWithTimestamp(tag, output, timestamp);
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return c.createAggregator(name, combiner);
-      }
-    };
-  }
-
-  /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context adaptContext(
-      OldDoFn<InputT, OutputT> fn,
-      final DoFn<InputT, OutputT>.Context c) {
-    return fn.new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public void output(OutputT output) {
-        c.output(output);
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        c.outputWithTimestamp(output, timestamp);
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        c.sideOutput(tag, output);
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-        c.sideOutputWithTimestamp(tag, output, timestamp);
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return c.createAggregator(name, combiner);
-      }
-    };
-  }
-
-  /**
-   * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
-   * returns {@code null}.
-   */
-  @Nullable
-  public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
-    } else {
-      return null;
-    }
-  }
-
   /**
    * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
    * OldDoFn}.
@@ -238,21 +103,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return fn.getInputTypeDescriptor();
-    }
-
-    @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return fn.getOutputTypeDescriptor();
-    }
-
-    @Override
-    Collection<Aggregator<?, ?>> getAggregators() {
-      return fn.getAggregators();
-    }
-
-    @Override
     public Duration getAllowedTimestampSkew() {
       return fn.getAllowedTimestampSkew();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 2d2c1fd..0aef552 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -71,21 +70,6 @@ import org.joda.time.Instant;
  */
 @Deprecated
 public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-
-  public DoFn<InputT, OutputT> toDoFn() {
-    DoFn<InputT, OutputT> doFn = DoFnAdapters.getDoFn(this);
-    if (doFn != null) {
-      return doFn;
-    }
-    if (this instanceof RequiresWindowAccess) {
-      // No parameters as it just accesses `this`
-      return new AdaptedRequiresWindowAccessDoFn();
-    } else {
-      // No parameters as it just accesses `this`
-      return new AdaptedDoFn();
-    }
-  }
-
   /**
    * Information accessible to all methods in this {@code OldDoFn}.
    * Used primarily to output elements.
@@ -334,7 +318,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     this(new HashMap<String, DelegatingAggregator<?, ?>>());
   }
 
-  OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
+  public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
     this.aggregators = aggregators;
   }
 
@@ -419,32 +403,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the input type of this {@code OldDoFn} instance's most-derived
-   * class.
-   *
-   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
-   */
-  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-    return new TypeDescriptor<InputT>(getClass()) {};
-  }
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the output type of this {@code OldDoFn} instance's
-   * most-derived class.
-   *
-   * <p>In the normal case of a concrete {@code OldDoFn} subclass with
-   * no generic type parameters of its own (including anonymous inner
-   * classes), this will be a complete non-generic type, which is good
-   * for choosing a default output {@code Coder<OutputT>} for the output
-   * {@code PCollection<OutputT>}.
-   */
-  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-    return new TypeDescriptor<OutputT>(getClass()) {};
-  }
-
-  /**
    * Returns an {@link Aggregator} with aggregation logic specified by the
    * {@link CombineFn} argument. The name provided must be unique across
    * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
@@ -504,255 +462,4 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
   Collection<Aggregator<?, ?>> getAggregators() {
     return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
   }
-
-  /**
-   * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
-   */
-  private class AdaptedContext extends Context {
-
-    private final DoFn<InputT, OutputT>.Context newContext;
-
-    public AdaptedContext(
-        DoFn<InputT, OutputT>.Context newContext) {
-      this.newContext = newContext;
-      super.setupDelegateAggregators();
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return newContext.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      newContext.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      newContext.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      newContext.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      newContext.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return newContext.createAggregator(name, combiner);
-    }
-  }
-
-  /**
-   * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
-   */
-  private class AdaptedProcessContext extends ProcessContext {
-
-    private final DoFn<InputT, OutputT>.ProcessContext newContext;
-
-    public AdaptedProcessContext(DoFn<InputT, OutputT>.ProcessContext newContext) {
-      this.newContext = newContext;
-    }
-
-    @Override
-    public InputT element() {
-      return newContext.element();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return newContext.sideInput(view);
-    }
-
-    @Override
-    public Instant timestamp() {
-      return newContext.timestamp();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(String.format(
-          "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
-          OldDoFn.class.getSimpleName(),
-          OldDoFn.ProcessContext.class.getSimpleName(),
-          OldDoFn.class.getSimpleName(),
-          DoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return newContext.pane();
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      throw new UnsupportedOperationException(String.format(
-          "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
-          OldDoFn.class.getSimpleName(),
-          OldDoFn.ProcessContext.class.getSimpleName(),
-          OldDoFn.class.getSimpleName(),
-          DoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return newContext.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      newContext.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      newContext.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      newContext.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      newContext.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return newContext.createAggregator(name, combiner);
-    }
-  }
-
-  private class AdaptedDoFn extends DoFn<InputT, OutputT> {
-
-    @Setup
-    public void setup() throws Exception {
-      OldDoFn.this.setup();
-    }
-
-    @StartBundle
-    public void startBundle(Context c) throws Exception {
-      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @Teardown
-    public void teardown() throws Exception {
-      OldDoFn.this.teardown();
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return OldDoFn.this.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      OldDoFn.this.populateDisplayData(builder);
-    }
-
-    @Override
-    public TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return OldDoFn.this.getInputTypeDescriptor();
-    }
-
-    @Override
-    Collection<Aggregator<?, ?>> getAggregators() {
-      return OldDoFn.this.getAggregators();
-    }
-
-    @Override
-    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return OldDoFn.this.getOutputTypeDescriptor();
-    }
-  }
-
-  /**
-   * A {@link ProcessContext} for an {@link OldDoFn} that implements
-   * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}.
-   */
-  private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
-
-    private final BoundedWindow window;
-
-    public AdaptedRequiresWindowAccessProcessContext(
-        DoFn<InputT, OutputT>.ProcessContext newContext,
-        BoundedWindow window) {
-      super(newContext);
-      this.window = window;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return window;
-    }
-  }
-
-  private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> {
-
-    @Setup
-    public void setup() throws Exception {
-      OldDoFn.this.setup();
-    }
-
-    @StartBundle
-    public void startBundle(Context c) throws Exception {
-      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      OldDoFn.this.processElement(
-          OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window));
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @Teardown
-    public void teardown() throws Exception {
-      OldDoFn.this.teardown();
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return OldDoFn.this.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      OldDoFn.this.populateDisplayData(builder);
-    }
-
-    @Override
-    public TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return OldDoFn.this.getInputTypeDescriptor();
-    }
-
-    @Override
-    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return OldDoFn.this.getOutputTypeDescriptor();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 50a7082..b141d51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -18,13 +18,7 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.util.UserCodeException;
 
 /** Static utilities for working with {@link DoFnInvoker}. */
 public class DoFnInvokers {
@@ -42,137 +36,22 @@ public class DoFnInvokers {
     return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
   }
 
-  private DoFnInvokers() {}
-
   /**
-   * Returns a {@link DoFnInvoker} for the given {@link Object}, which should be either a {@link
-   * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's function as an
-   * {@link Object} and then pass it to this method, so there is no need to statically specify what
-   * sort of object it is.
+   * Temporarily retained for compatibility with Dataflow worker.
+   * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}.
    *
-   * @deprecated this is to be used only as a migration path for decoupling upgrades
+   * @deprecated Use {@link #invokerFor(DoFn)}.
    */
+  @SuppressWarnings("unchecked")
   @Deprecated
-  public static DoFnInvoker<?, ?> invokerFor(Serializable deserializedFn) {
+  public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
+      Serializable deserializedFn) {
     if (deserializedFn instanceof DoFn) {
-      return invokerFor((DoFn<?, ?>) deserializedFn);
-    } else if (deserializedFn instanceof OldDoFn) {
-      return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
-    } else {
-      throw new IllegalArgumentException(
-          String.format(
-              "Cannot create a %s for %s; it should be either a %s or an %s.",
-              DoFnInvoker.class.getSimpleName(),
-              deserializedFn.toString(),
-              DoFn.class.getSimpleName(),
-              OldDoFn.class.getSimpleName()));
+      return invokerFor((DoFn<InputT, OutputT>) deserializedFn);
     }
+    throw new UnsupportedOperationException(
+        "Only DoFn supported, was: " + deserializedFn.getClass());
   }
 
-  /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
-  @Deprecated public static final DoFnInvokers INSTANCE = new DoFnInvokers();
-
-  /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
-  @Deprecated
-  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(Object deserializedFn) {
-    return (DoFnInvoker<InputT, OutputT>) DoFnInvokers.invokerFor((Serializable) deserializedFn);
-  }
-
-
-  static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, OutputT> {
-
-    private final OldDoFn<InputT, OutputT> fn;
-
-    public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) {
-      this.fn = fn;
-    }
-
-    @Override
-    public DoFn.ProcessContinuation invokeProcessElement(
-        ArgumentProvider<InputT, OutputT> extra) {
-      // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly
-      DoFn<InputT, OutputT>.ProcessContext newCtx =
-          extra.processContext(new DoFn<InputT, OutputT>() {});
-      OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
-          DoFnAdapters.adaptProcessContext(fn, newCtx, extra);
-      try {
-        fn.processElement(oldCtx);
-        return DoFn.ProcessContinuation.stop();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments) {
-      throw new UnsupportedOperationException(
-          String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public void invokeStartBundle(DoFn.Context c) {
-      OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
-      try {
-        fn.startBundle(oldCtx);
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeFinishBundle(DoFn.Context c) {
-      OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
-      try {
-        fn.finishBundle(oldCtx);
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeSetup() {
-      try {
-        fn.setup();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeTeardown() {
-      try {
-        fn.teardown();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(
-        CoderRegistry coderRegistry) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT> void invokeSplitRestriction(
-        InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-        TrackerT invokeNewTracker(RestrictionT restriction) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public DoFn<InputT, OutputT> getFn() {
-      throw new UnsupportedOperationException("getFn is not supported for OldDoFn");
-    }
-  }
+  private DoFnInvokers() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 456a6eb..55b8e7e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -738,39 +737,4 @@ public class DoFnInvokersTest {
     invoker.invokeOnTimer(timerId, mockArgumentProvider);
     assertThat(fn.window, equalTo(testWindow));
   }
-
-  private class OldDoFnIdentity extends OldDoFn<String, String> {
-    public void processElement(ProcessContext c) {}
-  }
-
-  @Test
-  public void testOldDoFnProcessElement() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn)
-        .invokeProcessElement(mockArgumentProvider);
-    verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class));
-  }
-
-  @Test
-  public void testOldDoFnStartBundle() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockProcessContext);
-    verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class));
-  }
-
-  @Test
-  public void testOldDoFnFinishBundle() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockProcessContext);
-    verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class));
-  }
-
-  @Test
-  public void testOldDoFnSetup() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup();
-    verify(mockOldDoFn).setup();
-  }
-
-  @Test
-  public void testOldDoFnTeardown() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown();
-    verify(mockOldDoFn).teardown();
-  }
 }


[10/10] incubator-beam git commit: This closes #1565

Posted by ke...@apache.org.
This closes #1565

  Renames ParDo.getNewFn to getFn
  Moves DoFnAdapters to runners-core
  Removes unused code from NoOpOldDoFn
  Removes ArgumentProvider.windowingInternals
  Removes code for wrapping DoFn as an OldDoFn
  Removes OldDoFn from ParDo
  Pushes uses of OldDoFn deeper inside Flink runner
  Remove ParDo.of(OldDoFn) from Apex runner
  Converts all easy OldDoFns to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a3ace4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a3ace4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a3ace4a

Branch: refs/heads/master
Commit: 5a3ace4a7c786938e5286e921f3afcd23b26de26
Parents: 3e1a628 6b502fc
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 15 14:04:03 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 15 14:04:03 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  57 +--
 .../translation/ApexPipelineTranslator.java     |   2 +
 .../translation/ParDoBoundMultiTranslator.java  |   6 +-
 .../apex/translation/ParDoBoundTranslator.java  |   6 +-
 .../apex/translation/WindowBoundTranslator.java |  78 +++
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../operators/ApexParDoOperator.java            |  25 +-
 .../FlattenPCollectionTranslatorTest.java       |  15 +-
 .../translation/GroupByKeyTranslatorTest.java   |  21 +-
 .../translation/ParDoBoundTranslatorTest.java   |  38 +-
 .../translation/ReadUnboundTranslatorTest.java  |  15 +-
 .../apache/beam/runners/core/AssignWindows.java |  46 --
 .../apache/beam/runners/core/DoFnAdapters.java  | 344 +++++++++++++
 .../beam/runners/core/SimpleDoFnRunner.java     |  57 ---
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   4 +-
 .../beam/runners/core/SplittableParDo.java      |  11 +-
 .../core/GroupAlsoByWindowsProperties.java      |   2 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |   2 +-
 .../direct/ParDoMultiOverrideFactory.java       |   2 +-
 .../ParDoSingleViaMultiOverrideFactory.java     |   4 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   4 +-
 .../apache/beam/runners/flink/FlinkRunner.java  |  10 +-
 .../FlinkBatchTransformTranslators.java         |  13 +-
 .../FlinkStreamingTransformTranslators.java     |   4 +-
 .../functions/FlinkDoFnFunction.java            |  10 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |  10 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |  43 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   8 +-
 .../beam/runners/flink/PipelineOptionsTest.java |   6 +-
 .../flink/streaming/DoFnOperatorTest.java       |  13 +-
 .../flink/streaming/GroupByNullKeyTest.java     |  18 +-
 .../streaming/TopWikipediaSessionsITCase.java   |  10 +-
 .../dataflow/DataflowPipelineTranslator.java    |   8 +-
 .../spark/translation/SparkAssignWindowFn.java  |   2 +-
 .../spark/translation/TransformTranslator.java  |   4 +-
 .../streaming/StreamingTransformTranslator.java |   4 +-
 .../beam/sdk/AggregatorPipelineExtractor.java   |   4 +-
 .../sdk/transforms/AggregatorRetriever.java     |  13 +-
 .../beam/sdk/transforms/DoFnAdapters.java       | 504 -------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  |   7 -
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 297 +----------
 .../org/apache/beam/sdk/transforms/ParDo.java   | 173 +------
 .../sdk/transforms/reflect/DoFnInvoker.java     |  20 -
 .../sdk/transforms/reflect/DoFnInvokers.java    | 141 +-----
 .../sdk/AggregatorPipelineExtractorTest.java    |  12 +-
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java |  74 +--
 .../apache/beam/sdk/transforms/OldDoFnTest.java | 125 ++---
 .../transforms/reflect/DoFnInvokersTest.java    |  42 --
 49 files changed, 689 insertions(+), 1629 deletions(-)
----------------------------------------------------------------------



[03/10] incubator-beam git commit: Remove ParDo.of(OldDoFn) from Apex runner

Posted by ke...@apache.org.
Remove ParDo.of(OldDoFn) from Apex runner

The only such usage was of AssignWindowsDoFn. Now, instead, it is
instantiated directly using a new translator for Window.Bound.

This change also separates the overloads of ApexParDoOperator for old
and new DoFn, to make the OldDoFn overload easier to track and later
remove.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af616d97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af616d97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af616d97

Branch: refs/heads/master
Commit: af616d9741b19d0a7705df6fe075be1509aa659f
Parents: f5f329e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 16:31:42 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:55:24 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 46 +-----------
 .../translation/ApexPipelineTranslator.java     |  2 +
 .../translation/ParDoBoundMultiTranslator.java  |  4 +-
 .../apex/translation/ParDoBoundTranslator.java  |  4 +-
 .../apex/translation/WindowBoundTranslator.java | 78 ++++++++++++++++++++
 .../operators/ApexParDoOperator.java            | 25 ++++++-
 .../translation/ParDoBoundTranslatorTest.java   |  3 +-
 .../apache/beam/runners/core/AssignWindows.java | 46 ------------
 .../spark/translation/SparkAssignWindowFn.java  |  2 +-
 9 files changed, 108 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index e5bde46..f12ebef 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -32,7 +32,6 @@ import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;
 import org.apache.apex.api.Launcher.LaunchMode;
 import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
-import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -46,9 +45,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
@@ -90,10 +86,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
 
-    if (Window.Bound.class.equals(transform.getClass())) {
-      return (OutputT) ((PCollection) input).apply(
-          new AssignWindowsAndSetStrategy((Window.Bound) transform));
-    } else if (Create.Values.class.equals(transform.getClass())) {
+    if (Create.Values.class.equals(transform.getClass())) {
       return (OutputT) PCollection
           .<OutputT>createPrimitiveOutputInternal(
               input.getPipeline(),
@@ -162,43 +155,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
   }
 
-  /**
-   * copied from DirectPipelineRunner.
-   * used to replace Window.Bound till equivalent function is added in Apex
-   */
-  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
-      extends PTransform<PCollection<T>, PCollection<T>> {
-
-    private final Window.Bound<T> wrapped;
-
-    public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
-      this.wrapped = wrapped;
-    }
-
-    @Override
-    public PCollection<T> expand(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputStrategy =
-          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
-      WindowFn<T, BoundedWindow> windowFn =
-          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
-      // If the Window.Bound transform only changed parts other than the WindowFn, then
-      // we skip AssignWindows even though it should be harmless in a perfect world.
-      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
-      // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
-      // AssignWindows in this case.
-      if (wrapped.getWindowFn() == null) {
-        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
-            .setWindowingStrategyInternal(outputStrategy);
-      } else {
-        return input
-            .apply("AssignWindows", new AssignWindows<>(windowFn))
-            .setWindowingStrategyInternal(outputStrategy);
-      }
-    }
-  }
-
   private static class IdentityFn<T> extends DoFn<T, T> {
     private static final long serialVersionUID = 1L;
     @ProcessElement

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 8d6db84..c8e0290 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
@@ -70,6 +71,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
         new CreateApexPCollectionViewTranslator());
     registerTransformTranslator(CreatePCollectionView.class,
         new CreatePCollectionViewTranslator());
+    registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
   }
 
   public ApexPipelineTranslator(ApexPipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 706482a..574ce8f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -31,7 +31,6 @@ import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -77,7 +76,6 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
               ApexRunner.class.getSimpleName()));
     }
 
-    OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
     PCollectionTuple output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
@@ -89,7 +87,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     ApexParDoOperator<InputT, OutputT> operator =
         new ApexParDoOperator<>(
             context.getPipelineOptions(),
-            oldDoFn,
+            doFn,
             transform.getMainOutputTag(),
             transform.getSideOutputTags().getAll(),
             context.<PCollection<?>>getInput().getWindowingStrategy(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index b5a50f6..de78628 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -23,7 +23,6 @@ import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -64,7 +63,6 @@ class ParDoBoundTranslator<InputT, OutputT>
               ApexRunner.class.getSimpleName()));
     }
 
-    OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
     PCollection<OutputT> output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
@@ -76,7 +74,7 @@ class ParDoBoundTranslator<InputT, OutputT>
     ApexParDoOperator<InputT, OutputT> operator =
         new ApexParDoOperator<>(
             context.getPipelineOptions(),
-            oldDoFn,
+            doFn,
             new TupleTag<OutputT>(),
             TupleTagList.empty().getAll() /*sideOutputTags*/,
             output.getWindowingStrategy(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
new file mode 100644
index 0000000..33b9269
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import java.util.Collections;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps an {@link
+ * AssignWindowsDoFn}.
+ */
+class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(Window.Bound<T> transform, TranslationContext context) {
+    PCollection<T> output = context.getOutput();
+    PCollection<T> input = context.getInput();
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, BoundedWindow> windowingStrategy =
+        (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
+
+    OldDoFn<T, T> fn =
+        (transform.getWindowFn() == null)
+            ? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
+            : new AssignWindowsDoFn<>(transform.getWindowFn());
+
+    ApexParDoOperator<T, T> operator =
+        new ApexParDoOperator<T, T>(
+            context.getPipelineOptions().as(ApexPipelineOptions.class),
+            fn,
+            new TupleTag<T>(),
+            TupleTagList.empty().getAll(),
+            windowingStrategy,
+            Collections.<PCollectionView<?>>emptyList(),
+            WindowedValue.getFullCoder(
+                input.getCoder(), windowingStrategy.getWindowFn().windowCoder()),
+            context.<Void>stateInternalsFactory());
+    context.addOperator(operator, operator.output);
+    context.addStream(context.getInput(), operator.input);
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 637c3ff..08f062d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -28,11 +28,9 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
@@ -50,6 +48,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.NullSideInputReader;
@@ -95,6 +94,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
       Maps.newHashMapWithExpectedSize(5);
 
+  @Deprecated
   public ApexParDoOperator(
       ApexPipelineOptions pipelineOptions,
       OldDoFn<InputT, OutputT> doFn,
@@ -125,6 +125,27 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
 
   }
 
+  public ApexParDoOperator(
+      ApexPipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      WindowingStrategy<?, ?> windowingStrategy,
+      List<PCollectionView<?>> sideInputs,
+      Coder<WindowedValue<InputT>> inputCoder,
+      StateInternalsFactory<Void> stateInternalsFactory
+      ) {
+    this(
+        pipelineOptions,
+        DoFnAdapters.toOldDoFn(doFn),
+        mainOutputTag,
+        sideOutputTags,
+        windowingStrategy,
+        sideInputs,
+        inputCoder,
+        stateInternalsFactory);
+  }
+
   @SuppressWarnings("unused") // for Kryo
   private ApexParDoOperator() {
     this.pipelineOptions = null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
index 28b2ec9..fa94b2a 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
@@ -208,7 +207,7 @@ public class ParDoBoundTranslatorTest {
     ApexParDoOperator<Integer, Integer> operator =
         new ApexParDoOperator<>(
             options,
-            DoFnAdapters.toOldDoFn(new Add(singletonView)),
+            new Add(singletonView),
             new TupleTag<Integer>(),
             TupleTagList.empty().getAll(),
             WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
deleted file mode 100644
index 375932a..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
- * {@link PCollection} to windows according to the provided {@link WindowFn}.
- *
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-public class AssignWindows<T, W extends BoundedWindow>
-    extends PTransform<PCollection<T>, PCollection<T>> {
-
-  private WindowFn<? super T, W> fn;
-
-  public AssignWindows(WindowFn<? super T, W> fn) {
-    this.fn = fn;
-  }
-
-  @Override
-  public PCollection<T> expand(PCollection<T> input) {
-    return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
index 9d7ed7d..18a3dd8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
 
 
 /**
- * An implementation of {@link org.apache.beam.runners.core.AssignWindows} for the Spark runner.
+ * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark runner.
  */
 public class SparkAssignWindowFn<T, W extends BoundedWindow>
     implements Function<WindowedValue<T>, WindowedValue<T>> {


[09/10] incubator-beam git commit: Renames ParDo.getNewFn to getFn

Posted by ke...@apache.org.
Renames ParDo.getNewFn to getFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b502fc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b502fc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b502fc1

Branch: refs/heads/master
Commit: 6b502fc111af266c7b1a0e6f7d473c36f57281a2
Parents: 33ed323
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:29:41 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:59:11 2016 -0800

----------------------------------------------------------------------
 .../translation/ParDoBoundMultiTranslator.java  |  2 +-
 .../apex/translation/ParDoBoundTranslator.java  |  2 +-
 .../beam/runners/core/SplittableParDo.java      |  4 ++--
 .../runners/direct/ParDoEvaluatorFactory.java   |  2 +-
 .../direct/ParDoMultiOverrideFactory.java       |  2 +-
 .../ParDoSingleViaMultiOverrideFactory.java     |  4 ++--
 .../direct/StatefulParDoEvaluatorFactory.java   |  4 ++--
 .../FlinkBatchTransformTranslators.java         |  4 ++--
 .../FlinkStreamingTransformTranslators.java     | 12 +++++------
 .../dataflow/DataflowPipelineTranslator.java    |  8 +++----
 .../spark/translation/TransformTranslator.java  |  4 ++--
 .../streaming/StreamingTransformTranslator.java |  4 ++--
 .../beam/sdk/AggregatorPipelineExtractor.java   |  4 ++--
 .../org/apache/beam/sdk/transforms/ParDo.java   | 22 ++++++++++----------
 .../sdk/AggregatorPipelineExtractorTest.java    | 12 +++++------
 15 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 574ce8f..bff7652 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -53,7 +53,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
 
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
-    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    DoFn<InputT, OutputT> doFn = transform.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
 
     if (signature.stateDeclarations().size() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index de78628..3b6eb6e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -40,7 +40,7 @@ class ParDoBoundTranslator<InputT, OutputT>
 
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    DoFn<InputT, OutputT> doFn = transform.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
 
     if (signature.stateDeclarations().size() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 720db63..f8d12ec 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -104,7 +104,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     checkNotNull(parDo, "parDo must not be null");
     this.parDo = parDo;
     checkArgument(
-        DoFnSignatures.getSignature(parDo.getNewFn().getClass()).processElement().isSplittable(),
+        DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
         "fn must be a splittable DoFn");
   }
 
@@ -114,7 +114,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   }
 
   private PCollectionTuple applyTyped(PCollection<InputT> input) {
-    DoFn<InputT, OutputT> fn = parDo.getNewFn();
+    DoFn<InputT, OutputT> fn = parDo.getFn();
     Coder<RestrictionT> restrictionCoder =
         DoFnInvokers.invokerFor(fn)
             .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index ec5dc2c..b4684e3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -65,7 +65,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
                 application;
 
     ParDo.BoundMulti<InputT, OutputT> transform = parDoApplication.getTransform();
-    final DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    final DoFn<InputT, OutputT> doFn = transform.getFn();
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 4e7914f..4401434 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -46,7 +46,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
   public PTransform<PCollection<? extends InputT>, PCollectionTuple> override(
       ParDo.BoundMulti<InputT, OutputT> transform) {
 
-    DoFn<InputT, OutputT> fn = transform.getNewFn();
+    DoFn<InputT, OutputT> fn = transform.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
     if (signature.processElement().isSplittable()) {
       return new SplittableParDo(transform);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 10530bb..5fcf49c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -56,12 +56,12 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
 
       PCollectionTuple outputs =
           input.apply(
-              ParDo.of(underlyingParDo.getNewFn())
+              ParDo.of(underlyingParDo.getFn())
                   .withSideInputs(underlyingParDo.getSideInputs())
                   .withOutputTags(mainOutputTag, TupleTagList.empty()));
       PCollection<OutputT> output = outputs.get(mainOutputTag);
 
-      output.setTypeDescriptor(underlyingParDo.getNewFn().getOutputTypeDescriptor());
+      output.setTypeDescriptor(underlyingParDo.getFn().getOutputTypeDescriptor());
       return output;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 1f3286c..1f64d9a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -86,7 +86,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
       throws Exception {
 
     final DoFn<KV<K, InputT>, OutputT> doFn =
-        application.getTransform().getUnderlyingParDo().getNewFn();
+        application.getTransform().getUnderlyingParDo().getFn();
     final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
 
     // If the DoFn is stateful, schedule state clearing.
@@ -141,7 +141,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
       WindowingStrategy<?, ?> windowingStrategy = pc.getWindowingStrategy();
       BoundedWindow window = transformOutputWindow.getWindow();
       final DoFn<?, ?> doFn =
-          transformOutputWindow.getTransform().getTransform().getUnderlyingParDo().getNewFn();
+          transformOutputWindow.getTransform().getTransform().getUnderlyingParDo().getFn();
       final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
 
       final DirectStepContext stepContext =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 497b293..eb625b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -516,7 +516,7 @@ class FlinkBatchTransformTranslators {
         ParDo.Bound<InputT, OutputT> transform,
 
         FlinkBatchTranslationContext context) {
-      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      DoFn<InputT, OutputT> doFn = transform.getFn();
       rejectStateAndTimers(doFn);
 
       DataSet<WindowedValue<InputT>> inputDataSet =
@@ -562,7 +562,7 @@ class FlinkBatchTransformTranslators {
     public void translateNode(
         ParDo.BoundMulti<InputT, OutputT> transform,
         FlinkBatchTranslationContext context) {
-      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      DoFn<InputT, OutputT> doFn = transform.getFn();
       rejectStateAndTimers(doFn);
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 42ef630..ffa6d16 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -338,7 +338,7 @@ public class FlinkStreamingTransformTranslators {
         ParDo.Bound<InputT, OutputT> transform,
         FlinkStreamingTranslationContext context) {
 
-      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      DoFn<InputT, OutputT> doFn = transform.getFn();
       rejectStateAndTimers(doFn);
 
       WindowingStrategy<?, ?> windowingStrategy =
@@ -358,7 +358,7 @@ public class FlinkStreamingTransformTranslators {
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
-                transform.getNewFn(),
+                transform.getFn(),
                 inputTypeInfo,
                 new TupleTag<OutputT>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
@@ -381,7 +381,7 @@ public class FlinkStreamingTransformTranslators {
 
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
-                transform.getNewFn(),
+                transform.getFn(),
                 inputTypeInfo,
                 new TupleTag<OutputT>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
@@ -490,7 +490,7 @@ public class FlinkStreamingTransformTranslators {
         ParDo.BoundMulti<InputT, OutputT> transform,
         FlinkStreamingTranslationContext context) {
 
-      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      DoFn<InputT, OutputT> doFn = transform.getFn();
       rejectStateAndTimers(doFn);
 
       // we assume that the transformation does not change the windowing strategy.
@@ -515,7 +515,7 @@ public class FlinkStreamingTransformTranslators {
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
-                transform.getNewFn(),
+                transform.getFn(),
                 inputTypeInfo,
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),
@@ -542,7 +542,7 @@ public class FlinkStreamingTransformTranslators {
 
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
-                transform.getNewFn(),
+                transform.getFn(),
                 inputTypeInfo,
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index a56690c..8d2b076 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -955,14 +955,14 @@ public class DataflowPipelineTranslator {
           private <InputT, OutputT> void translateMultiHelper(
               ParDo.BoundMulti<InputT, OutputT> transform,
               TranslationContext context) {
-            rejectStatefulDoFn(transform.getNewFn());
+            rejectStatefulDoFn(transform.getFn());
 
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), transform.getSideInputs(), context);
             BiMap<Long, TupleTag<?>> outputMap =
                 translateOutputs(context.getOutput(transform), context);
             translateFn(
-                transform.getNewFn(),
+                transform.getFn(),
                 context.getInput(transform).getWindowingStrategy(),
                 transform.getSideInputs(),
                 context.getInput(transform).getCoder(),
@@ -985,13 +985,13 @@ public class DataflowPipelineTranslator {
           private <InputT, OutputT> void translateSingleHelper(
               ParDo.Bound<InputT, OutputT> transform,
               TranslationContext context) {
-            rejectStatefulDoFn(transform.getNewFn());
+            rejectStatefulDoFn(transform.getFn());
 
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), transform.getSideInputs(), context);
             long mainOutput = context.addOutput(context.getOutput(transform));
             translateFn(
-                transform.getNewFn(),
+                transform.getFn(),
                 context.getInput(transform).getWindowingStrategy(),
                 transform.getSideInputs(),
                 context.getInput(transform).getCoder(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index ac91892..5dd6beb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -227,7 +227,7 @@ public final class TransformTranslator {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
-        DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
@@ -250,7 +250,7 @@ public final class TransformTranslator {
     return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
-        DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 27204ed..070ccbb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -344,7 +344,7 @@ final class StreamingTransformTranslator {
       @Override
       public void evaluate(final ParDo.Bound<InputT, OutputT> transform,
                            final EvaluationContext context) {
-        final DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        final DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
@@ -378,7 +378,7 @@ final class StreamingTransformTranslator {
       @Override
       public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform,
                            final EvaluationContext context) {
-        final DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        final DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
index ade5978..c79f779 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
@@ -70,10 +70,10 @@ class AggregatorPipelineExtractor {
     private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
       if (transform != null) {
         if (transform instanceof ParDo.Bound) {
-          return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getNewFn());
+          return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
         } else if (transform instanceof ParDo.BoundMulti) {
           return AggregatorRetriever.getAggregators(
-              ((ParDo.BoundMulti<?, ?>) transform).getNewFn());
+              ((ParDo.BoundMulti<?, ?>) transform).getFn());
         }
       }
       return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index d2149c0..f897f82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -745,7 +745,7 @@ public class ParDo {
     @Override
     public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(getNewFn()),
+          !isSplittable(getFn()),
           "%s does not support Splittable DoFn",
           input.getPipeline().getOptions().getRunner().getName());
       validateWindowType(input, fn);
@@ -753,7 +753,7 @@ public class ParDo {
               input.getPipeline(),
               input.getWindowingStrategy(),
               input.isBounded())
-          .setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
+          .setTypeDescriptor(getFn().getOutputTypeDescriptor());
     }
 
     @Override
@@ -761,14 +761,14 @@ public class ParDo {
     protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
         throws CannotProvideCoderException {
       return input.getPipeline().getCoderRegistry().getDefaultCoder(
-          getNewFn().getOutputTypeDescriptor(),
-          getNewFn().getInputTypeDescriptor(),
+          getFn().getOutputTypeDescriptor(),
+          getFn().getInputTypeDescriptor(),
           ((PCollection<InputT>) input).getCoder());
     }
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = getNewFn().getClass();
+      Class<?> clazz = getFn().getClass();
       if (clazz.isAnonymousClass()) {
         return "AnonymousParDo";
       } else {
@@ -789,7 +789,7 @@ public class ParDo {
       ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
     }
 
-    public DoFn<InputT, OutputT> getNewFn() {
+    public DoFn<InputT, OutputT> getFn() {
       return fn;
     }
 
@@ -952,7 +952,7 @@ public class ParDo {
     @Override
     public PCollectionTuple expand(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(getNewFn()),
+          !isSplittable(getFn()),
           "%s does not support Splittable DoFn",
           input.getPipeline().getOptions().getRunner().getName());
       validateWindowType(input, fn);
@@ -965,7 +965,7 @@ public class ParDo {
       // The fn will likely be an instance of an anonymous subclass
       // such as DoFn<Integer, String> { }, thus will have a high-fidelity
       // TypeDescriptor for the output type.
-      outputs.get(mainOutputTag).setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
+      outputs.get(mainOutputTag).setTypeDescriptor(getFn().getOutputTypeDescriptor());
 
       return outputs;
     }
@@ -984,13 +984,13 @@ public class ParDo {
       Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
       return input.getPipeline().getCoderRegistry().getDefaultCoder(
           output.getTypeDescriptor(),
-          getNewFn().getInputTypeDescriptor(),
+          getFn().getInputTypeDescriptor(),
           inputCoder);
       }
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = getNewFn().getClass();
+      Class<?> clazz = getFn().getClass();
       if (clazz.isAnonymousClass()) {
         return "AnonymousParMultiDo";
       } else {
@@ -1004,7 +1004,7 @@ public class ParDo {
       ParDo.populateDisplayData(builder, fn, fnDisplayData);
     }
 
-    public DoFn<InputT, OutputT> getNewFn() {
+    public DoFn<InputT, OutputT> getFn() {
       return fn;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index c4e9b8a..1bf2c3d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -68,7 +68,7 @@ public class AggregatorPipelineExtractorTest {
     @SuppressWarnings("rawtypes")
     ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
     AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getNewFn()).thenReturn(fn);
+    when(bound.getFn()).thenReturn(fn);
 
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
     Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
@@ -96,7 +96,7 @@ public class AggregatorPipelineExtractorTest {
     @SuppressWarnings("rawtypes")
     ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti");
     AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getNewFn()).thenReturn(fn);
+    when(bound.getFn()).thenReturn(fn);
 
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
     Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
@@ -126,8 +126,8 @@ public class AggregatorPipelineExtractorTest {
     @SuppressWarnings("rawtypes")
     ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
     AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getNewFn()).thenReturn(fn);
-    when(otherBound.getNewFn()).thenReturn(fn);
+    when(bound.getFn()).thenReturn(fn);
+    when(otherBound.getFn()).thenReturn(fn);
 
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
     Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
@@ -162,7 +162,7 @@ public class AggregatorPipelineExtractorTest {
     AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
 
-    when(bound.getNewFn()).thenReturn(fn);
+    when(bound.getFn()).thenReturn(fn);
 
     @SuppressWarnings("rawtypes")
     ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
@@ -170,7 +170,7 @@ public class AggregatorPipelineExtractorTest {
     AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
     Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
 
-    when(otherBound.getNewFn()).thenReturn(otherFn);
+    when(otherBound.getFn()).thenReturn(otherFn);
 
     TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);


[07/10] incubator-beam git commit: Removes ArgumentProvider.windowingInternals

Posted by ke...@apache.org.
Removes ArgumentProvider.windowingInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f3e8a038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f3e8a038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f3e8a038

Branch: refs/heads/master
Commit: f3e8a0383bf9cb3f9452e0364f7deba113cadff9
Parents: a22de15
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:23:15 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 57 --------------------
 .../beam/runners/core/SplittableParDo.java      |  7 ---
 .../beam/sdk/transforms/DoFnAdapters.java       | 14 -----
 .../apache/beam/sdk/transforms/DoFnTester.java  |  7 ---
 .../sdk/transforms/reflect/DoFnInvoker.java     | 20 -------
 .../transforms/reflect/DoFnInvokersTest.java    |  6 ---
 6 files changed, 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index af7f5ca..041cdde 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -52,13 +52,10 @@ import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateSpec;
@@ -420,11 +417,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      throw new UnsupportedOperationException("WindowingInternals are unsupported.");
-    }
-
-    @Override
     public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
       throw new UnsupportedOperationException(
           "Cannot access RestrictionTracker outside of @ProcessElement method.");
@@ -634,54 +626,5 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       throw new UnsupportedOperationException("Timer parameters are not supported.");
     }
 
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return windowedValue.getWindows();
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return windowedValue.getPane();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return context.stepContext.timerInternals();
-        }
-
-        @Override
-        public StateInternals<?> stateInternals() {
-          return stepContext.stateInternals();
-        }
-
-        @Override
-        public void outputWindowedValue(
-            OutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException("A DoFn cannot output to a different window");
-        }
-
-        @Override
-        public <SideOutputT> void sideOutputWindowedValue(
-            TupleTag<SideOutputT> tag,
-            SideOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException(
-              "A DoFn cannot side output to a different window");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
-          return context.sideInput(view, sideInputWindow);
-        }
-      };
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 8a9bfcd..720db63 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateInternals;
@@ -685,12 +684,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       }
 
       @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        // DoFnSignatures should have verified that this DoFn doesn't access extra context.
-        throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
-      }
-
-      @Override
       public TrackerT restrictionTracker() {
         return tracker;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index d1c40a6..0a71faa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -202,14 +201,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get WindowingInternals in processElement");
-    }
-
-    @Override
     public DoFn.InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("inputProvider() exists only for testing");
     }
@@ -322,11 +313,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return context.windowingInternals();
-    }
-
-    @Override
     public DoFn.InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("inputProvider() exists only for testing");
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 93b3f59..527d529 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
@@ -335,12 +334,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             }
 
             @Override
-            public WindowingInternals<InputT, OutputT> windowingInternals() {
-              throw new UnsupportedOperationException(
-                  "Not expected to access WindowingInternals from a new DoFn");
-            }
-
-            @Override
             public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
               throw new UnsupportedOperationException(
                   "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 97ac9d3..354578e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -27,11 +27,9 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFn.StartBundle;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 
 /**
@@ -122,19 +120,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     OutputReceiver<OutputT> outputReceiver();
 
     /**
-     * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so
-     * an {@link OldDoFn} can be run via {@link DoFnInvoker}.
-     *
-     * <p>This is <i>not</i> exposed via the reflective capabilities of {@link DoFn}.
-     *
-     * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state
-     *     and timers, they will need to wait for the arrival of those features. Do not introduce
-     *     new uses of this method.
-     */
-    @Deprecated
-    WindowingInternals<InputT, OutputT> windowingInternals();
-
-    /**
      * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
      * the current {@link ProcessElement} call.
      */
@@ -180,11 +165,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return null;
-    }
-
-    @Override
     public State state(String stateId) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 55b8e7e..4c6bee1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -51,7 +50,6 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -77,18 +75,14 @@ public class DoFnInvokersTest {
   @Mock private IntervalWindow mockWindow;
   @Mock private DoFn.InputProvider<String> mockInputProvider;
   @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
-  @Mock private WindowingInternals<String, String> mockWindowingInternals;
   @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
 
-  @Mock private OldDoFn<String, String> mockOldDoFn;
-
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
     when(mockArgumentProvider.window()).thenReturn(mockWindow);
     when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider);
     when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver);
-    when(mockArgumentProvider.windowingInternals()).thenReturn(mockWindowingInternals);
     when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
   }