You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/17 23:05:25 UTC

[1/4] incubator-beam git commit: Rewrites DoFnReflector to go via DoFnSignature

Repository: incubator-beam
Updated Branches:
  refs/heads/master 46097736b -> 89367cfb1


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
new file mode 100644
index 0000000..1a26df2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+import com.google.common.reflect.TypeToken;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/** Tests for {@link DoFnSignatures}. */
+@RunWith(JUnit4.class)
+public class DoFnSignaturesTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static class FakeDoFn extends DoFn<Integer, String> {}
+
+  @SuppressWarnings({"unused"})
+  private void missingProcessContext() {}
+
+  @Test
+  public void testMissingProcessContext() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        getClass().getName()
+            + "#missingProcessContext() must take a ProcessContext<> as its first argument");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass().getDeclaredMethod("missingProcessContext"),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings({"unused"})
+  private void badProcessContext(String s) {}
+
+  @Test
+  public void testBadProcessContextType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        getClass().getName()
+            + "#badProcessContext(String) must take a ProcessContext<> as its first argument");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass().getDeclaredMethod("badProcessContext", String.class),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings({"unused"})
+  private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
+
+  @Test
+  public void testBadExtraContext() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        getClass().getName()
+            + "#badExtraContext(Context, int) must have a single argument of type Context");
+
+    DoFnSignatures.analyzeBundleMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass().getDeclaredMethod("badExtraContext", DoFn.Context.class, int.class),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings({"unused"})
+  private void badExtraProcessContext(DoFn<Integer, String>.ProcessContext c, Integer n) {}
+
+  @Test
+  public void testBadExtraProcessContextType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Integer is not a valid context parameter for method "
+            + getClass().getName()
+            + "#badExtraProcessContext(ProcessContext, Integer)"
+            + ". Should be one of [BoundedWindow]");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass()
+            .getDeclaredMethod("badExtraProcessContext", DoFn.ProcessContext.class, Integer.class),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings("unused")
+  private int badReturnType() {
+    return 0;
+  }
+
+  @Test
+  public void testBadReturnType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void return type");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass().getDeclaredMethod("badReturnType"),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings("unused")
+  private void goodConcreteTypes(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<String> output) {}
+
+  @Test
+  public void testGoodConcreteTypes() throws Exception {
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "goodConcreteTypes",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        method,
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  private static class GoodTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> {
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void goodTypeVariables(
+        DoFn<InputT, OutputT>.ProcessContext c,
+        DoFn.InputProvider<InputT> input,
+        DoFn.OutputReceiver<OutputT> output) {}
+  }
+
+  @Test
+  public void testGoodTypeVariables() throws Exception {
+    DoFnSignatures.INSTANCE.getOrParseSignature(GoodTypeVariables.class);
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void processElement(ProcessContext c, InputProvider<T> input, OutputReceiver<T> output) {
+      c.output(c.element());
+    }
+  }
+
+  private static class IdentityListFn<T> extends IdentityFn<List<T>> {}
+
+  @Test
+  public void testIdentityFnApplied() throws Exception {
+    DoFnSignatures.INSTANCE.getOrParseSignature(new IdentityFn<String>() {}.getClass());
+  }
+
+  @SuppressWarnings("unused")
+  private void badGenericTwoArgs(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<Integer> output) {}
+
+  @Test
+  public void testBadGenericsTwoArgs() throws Exception {
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "badGenericTwoArgs",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter "
+            + "for method "
+            + getClass().getName()
+            + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver): "
+            + "OutputReceiver<Integer>, should be "
+            + "OutputReceiver<String>");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        method,
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings("unused")
+  private void badGenericWildCards(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<? super Integer> output) {}
+
+  @Test
+  public void testBadGenericWildCards() throws Exception {
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "badGenericWildCards",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter for method "
+            + getClass().getName()
+            + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver): "
+            + "OutputReceiver<? super Integer>, should be "
+            + "OutputReceiver<String>");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        method,
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  static class BadTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> {
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void badTypeVariables(
+        DoFn<InputT, OutputT>.ProcessContext c,
+        DoFn.InputProvider<InputT> input,
+        DoFn.OutputReceiver<InputT> output) {}
+  }
+
+  @Test
+  public void testBadTypeVariables() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter for method "
+            + BadTypeVariables.class.getName()
+            + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver): "
+            + "OutputReceiver<InputT>, should be "
+            + "OutputReceiver<OutputT>");
+
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadTypeVariables.class);
+  }
+
+  @Test
+  public void testNoProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No method annotated with @ProcessElement found");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(new DoFn<String, String>() {}.getClass());
+  }
+
+  @Test
+  public void testMultipleProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
+    thrown.expectMessage("foo()");
+    thrown.expectMessage("bar()");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void foo() {}
+
+          @ProcessElement
+          public void bar() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testMultipleStartBundleElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Found multiple methods annotated with @StartBundle");
+    thrown.expectMessage("bar()");
+    thrown.expectMessage("baz()");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void foo() {}
+
+          @StartBundle
+          public void bar() {}
+
+          @StartBundle
+          public void baz() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testMultipleFinishBundleMethods() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Found multiple methods annotated with @FinishBundle");
+    thrown.expectMessage("bar(Context)");
+    thrown.expectMessage("baz(Context)");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void foo(ProcessContext context) {}
+
+          @FinishBundle
+          public void bar(Context context) {}
+
+          @FinishBundle
+          public void baz(Context context) {}
+        }.getClass());
+  }
+
+  @Test
+  public void testPrivateProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("process() must be public");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          private void process() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testPrivateStartBundle() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("startBundle() must be public");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement() {}
+
+          @StartBundle
+          void startBundle() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testPrivateFinishBundle() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("finishBundle() must be public");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement() {}
+
+          @FinishBundle
+          void finishBundle() {}
+        }.getClass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
new file mode 100644
index 0000000..a574ed8
--- /dev/null
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.microbenchmarks.transforms;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Benchmarks for {@link OldDoFn} and {@link DoFn} invocations, specifically for measuring the
+ * overhead of {@link DoFnInvokers}.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Warmup(iterations = 5)
+public class DoFnInvokersBenchmark {
+
+  private static final String ELEMENT = "some string to use for testing";
+
+  private OldDoFn<String, String> oldDoFn = new UpperCaseOldDoFn();
+  private DoFn<String, String> doFn = new UpperCaseDoFn();
+
+  private StubOldDoFnProcessContext stubOldDoFnContext =
+      new StubOldDoFnProcessContext(oldDoFn, ELEMENT);
+  private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT);
+  private ExtraContextFactory<String, String> extraContextFactory =
+      new DoFn.FakeExtraContextFactory<>();
+
+  private OldDoFn<String, String> adaptedDoFnWithContext;
+
+  private DoFnInvoker<String, String> invoker;
+
+  @Setup
+  public void setUp() {
+    adaptedDoFnWithContext = DoFnAdapters.toOldDoFn(doFn);
+    invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(doFn);
+  }
+
+  @Benchmark
+  public String invokeOldDoFn() throws Exception {
+    oldDoFn.processElement(stubOldDoFnContext);
+    return stubDoFnContext.output;
+  }
+
+  @Benchmark
+  public String invokeDoFnWithContextViaAdaptor() throws Exception {
+    adaptedDoFnWithContext.processElement(stubOldDoFnContext);
+    return stubOldDoFnContext.output;
+  }
+
+  @Benchmark
+  public String invokeDoFnWithContext() throws Exception {
+    invoker.invokeProcessElement(stubDoFnContext, extraContextFactory);
+    return stubDoFnContext.output;
+  }
+
+  private static class UpperCaseOldDoFn extends OldDoFn<String, String> {
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element().toUpperCase());
+    }
+  }
+
+  private static class UpperCaseDoFn extends DoFn<String, String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element().toUpperCase());
+    }
+  }
+
+  private static class StubOldDoFnProcessContext extends OldDoFn<String, String>.ProcessContext {
+
+    private final String element;
+    private String output;
+
+    public StubOldDoFnProcessContext(OldDoFn<String, String> fn, String element) {
+      fn.super();
+      this.element = element;
+    }
+
+    @Override
+    public String element() {
+      return element;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return null;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return null;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return null;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return null;
+    }
+
+    @Override
+    public WindowingInternals<String, String> windowingInternals() {
+      return null;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return null;
+    }
+
+    @Override
+    public void output(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public void outputWithTimestamp(String output, Instant timestamp) {
+      output(output);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {}
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {}
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return null;
+    }
+  }
+
+  private static class StubDoFnProcessContext extends DoFn<String, String>.ProcessContext {
+    private final String element;
+    private String output;
+
+    public StubDoFnProcessContext(DoFn<String, String> fn, String element) {
+      fn.super();
+      this.element = element;
+    }
+
+    @Override
+    public String element() {
+      return element;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return null;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return null;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return null;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return null;
+    }
+
+    @Override
+    public void output(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public void outputWithTimestamp(String output, Instant timestamp) {
+      output(output);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {}
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
deleted file mode 100644
index 91ecd16..0000000
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.microbenchmarks.transforms;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFnReflector;
-import org.apache.beam.sdk.transforms.DoFnReflector.DoFnInvoker;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.joda.time.Instant;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Warmup;
-
-/**
- * Benchmarks for {@link OldDoFn} and {@link DoFn} invocations, specifically
- * for measuring the overhead of {@link DoFnReflector}.
- */
-@State(Scope.Benchmark)
-@Fork(1)
-@Warmup(iterations = 5)
-public class DoFnReflectorBenchmark {
-
-  private static final String ELEMENT = "some string to use for testing";
-
-  private OldDoFn<String, String> oldDoFn = new UpperCaseOldDoFn();
-  private DoFn<String, String> doFn = new UpperCaseDoFn();
-
-  private StubOldDoFnProcessContext stubOldDoFnContext = new StubOldDoFnProcessContext(oldDoFn,
-      ELEMENT);
-  private StubDoFnProcessContext stubDoFnContext =
-      new StubDoFnProcessContext(doFn, ELEMENT);
-  private ExtraContextFactory<String, String> extraContextFactory =
-      new DoFn.FakeExtraContextFactory<>();
-
-  private DoFnReflector doFnReflector;
-  private OldDoFn<String, String> adaptedDoFnWithContext;
-
-  private DoFnInvoker<String, String> invoker;
-
-  @Setup
-  public void setUp() {
-    doFnReflector = DoFnReflector.of(doFn.getClass());
-    adaptedDoFnWithContext = doFnReflector.toDoFn(doFn);
-    invoker = doFnReflector.bindInvoker(doFn);
-  }
-
-  @Benchmark
-  public String invokeOldDoFn() throws Exception {
-    oldDoFn.processElement(stubOldDoFnContext);
-    return stubDoFnContext.output;
-  }
-
-  @Benchmark
-  public String invokeDoFnWithContextViaAdaptor() throws Exception {
-    adaptedDoFnWithContext.processElement(stubOldDoFnContext);
-    return stubOldDoFnContext.output;
-  }
-
-  @Benchmark
-  public String invokeDoFnWithContext() throws Exception {
-    invoker.invokeProcessElement(stubDoFnContext, extraContextFactory);
-    return stubDoFnContext.output;
-  }
-
-  private static class UpperCaseOldDoFn extends OldDoFn<String, String> {
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(c.element().toUpperCase());
-    }
-  }
-
-  private static class UpperCaseDoFn extends DoFn<String, String> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(c.element().toUpperCase());
-    }
-  }
-
-  private static class StubOldDoFnProcessContext extends OldDoFn<String, String>.ProcessContext {
-
-    private final String element;
-    private String output;
-
-    public StubOldDoFnProcessContext(OldDoFn<String, String> fn, String element) {
-      fn.super();
-      this.element = element;
-    }
-
-    @Override
-    public String element() {
-      return element;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return null;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return null;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return null;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return null;
-    }
-
-    @Override
-    public WindowingInternals<String, String> windowingInternals() {
-      return null;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return null;
-    }
-
-    @Override
-    public void output(String output) {
-      this.output = output;
-    }
-
-    @Override
-    public void outputWithTimestamp(String output, Instant timestamp) {
-      output(output);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return null;
-    }
-  }
-
-  private static class StubDoFnProcessContext
-      extends DoFn<String, String>.ProcessContext {
-    private final String element;
-    private  String output;
-
-    public StubDoFnProcessContext(DoFn<String, String> fn, String element) {
-      fn.super();
-      this.element = element;
-    }
-
-    @Override
-    public String element() {
-      return element;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return null;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return null;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return null;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return null;
-    }
-
-    @Override
-    public void output(String output) {
-      this.output = output;
-    }
-
-    @Override
-    public void outputWithTimestamp(String output, Instant timestamp) {
-      output(output);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-    }
-  }
-}


[2/4] incubator-beam git commit: Rewrites DoFnReflector to go via DoFnSignature

Posted by bc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
new file mode 100644
index 0000000..6730140
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+import com.google.auto.value.AutoValue;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra
+ * context it requires, types of the input and output elements, etc.
+ *
+ * <p>See <a href="https://s.apache.org/a-new-dofn">A new DoFn</a>.
+ */
+@AutoValue
+public abstract class DoFnSignature {
+  public abstract Class<? extends DoFn> fnClass();
+
+  public abstract ProcessElementMethod processElement();
+
+  @Nullable
+  public abstract BundleMethod startBundle();
+
+  @Nullable
+  public abstract BundleMethod finishBundle();
+
+  @Nullable
+  public abstract LifecycleMethod setup();
+
+  @Nullable
+  public abstract LifecycleMethod teardown();
+
+  static DoFnSignature create(
+      Class<? extends DoFn> fnClass,
+      ProcessElementMethod processElement,
+      @Nullable BundleMethod startBundle,
+      @Nullable BundleMethod finishBundle,
+      @Nullable LifecycleMethod setup,
+      @Nullable LifecycleMethod teardown) {
+    return new AutoValue_DoFnSignature(
+        fnClass,
+        processElement,
+        startBundle,
+        finishBundle,
+        setup,
+        teardown);
+  }
+
+  /** Describes a {@link DoFn.ProcessElement} method. */
+  @AutoValue
+  public abstract static class ProcessElementMethod {
+    enum Parameter {
+      BOUNDED_WINDOW,
+      INPUT_PROVIDER,
+      OUTPUT_RECEIVER
+    }
+
+    public abstract Method targetMethod();
+
+    public abstract List<Parameter> extraParameters();
+
+    static ProcessElementMethod create(Method targetMethod, List<Parameter> extraParameters) {
+      return new AutoValue_DoFnSignature_ProcessElementMethod(
+          targetMethod, Collections.unmodifiableList(extraParameters));
+    }
+
+    /** @return true if the reflected {@link DoFn} uses a Single Window. */
+    public boolean usesSingleWindow() {
+      return extraParameters().contains(Parameter.BOUNDED_WINDOW);
+    }
+  }
+
+  /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
+  @AutoValue
+  public abstract static class BundleMethod {
+    public abstract Method targetMethod();
+
+    static BundleMethod create(Method targetMethod) {
+      return new AutoValue_DoFnSignature_BundleMethod(targetMethod);
+    }
+  }
+
+  /** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */
+  @AutoValue
+  public abstract static class LifecycleMethod {
+    public abstract Method targetMethod();
+
+    static LifecycleMethod create(Method targetMethod) {
+      return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
new file mode 100644
index 0000000..80b3b4f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.reflect.TypeParameter;
+import com.google.common.reflect.TypeToken;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getOrParseSignature}.
+ */
+public class DoFnSignatures {
+  public static final DoFnSignatures INSTANCE = new DoFnSignatures();
+
+  private DoFnSignatures() {}
+
+  private final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
+
+  /** @return the {@link DoFnSignature} for the given {@link DoFn}. */
+  public synchronized DoFnSignature getOrParseSignature(
+      @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
+    DoFnSignature signature = signatureCache.get(fn);
+    if (signature == null) {
+      signatureCache.put(fn, signature = parseSignature(fn));
+    }
+    return signature;
+  }
+
+  /** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */
+  private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) {
+    TypeToken<?> inputT = null;
+    TypeToken<?> outputT = null;
+
+    // Extract the input and output type.
+    checkArgument(
+        DoFn.class.isAssignableFrom(fnClass),
+        "%s must be subtype of DoFn",
+        fnClass.getSimpleName());
+    TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass);
+    for (TypeToken<?> supertype : fnToken.getTypes()) {
+      if (!supertype.getRawType().equals(DoFn.class)) {
+        continue;
+      }
+      Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments();
+      inputT = TypeToken.of(args[0]);
+      outputT = TypeToken.of(args[1]);
+    }
+    checkNotNull(inputT, "Unable to determine input type from %s", fnClass);
+
+    Method processElementMethod = findAnnotatedMethod(DoFn.ProcessElement.class, fnClass, true);
+    Method startBundleMethod = findAnnotatedMethod(DoFn.StartBundle.class, fnClass, false);
+    Method finishBundleMethod = findAnnotatedMethod(DoFn.FinishBundle.class, fnClass, false);
+    Method setupMethod = findAnnotatedMethod(DoFn.Setup.class, fnClass, false);
+    Method teardownMethod = findAnnotatedMethod(DoFn.Teardown.class, fnClass, false);
+
+    return DoFnSignature.create(
+        fnClass,
+        analyzeProcessElementMethod(fnToken, processElementMethod, inputT, outputT),
+        (startBundleMethod == null)
+            ? null
+            : analyzeBundleMethod(fnToken, startBundleMethod, inputT, outputT),
+        (finishBundleMethod == null)
+            ? null
+            : analyzeBundleMethod(fnToken, finishBundleMethod, inputT, outputT),
+        (setupMethod == null) ? null : analyzeLifecycleMethod(setupMethod),
+        (teardownMethod == null) ? null : analyzeLifecycleMethod(teardownMethod));
+  }
+
+  /**
+   * Generates a type token for {@code DoFn<InputT, OutputT>.ProcessContext} given {@code InputT}
+   * and {@code OutputT}.
+   */
+  private static <InputT, OutputT>
+      TypeToken<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf(
+          TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
+    return new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}.where(
+            new TypeParameter<InputT>() {}, inputT)
+        .where(new TypeParameter<OutputT>() {}, outputT);
+  }
+
+  /**
+   * Generates a type token for {@code DoFn<InputT, OutputT>.Context} given {@code InputT} and
+   * {@code OutputT}.
+   */
+  private static <InputT, OutputT> TypeToken<DoFn<InputT, OutputT>.Context> doFnContextTypeOf(
+      TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
+    return new TypeToken<DoFn<InputT, OutputT>.Context>() {}.where(
+            new TypeParameter<InputT>() {}, inputT)
+        .where(new TypeParameter<OutputT>() {}, outputT);
+  }
+
+  /** Generates a type token for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */
+  private static <InputT> TypeToken<DoFn.InputProvider<InputT>> inputProviderTypeOf(
+      TypeToken<InputT> inputT) {
+    return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
+        new TypeParameter<InputT>() {}, inputT);
+  }
+
+  /** Generates a type token for {@code DoFn.OutputReceiver<OutputT>} given {@code OutputT}. */
+  private static <OutputT> TypeToken<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf(
+      TypeToken<OutputT> inputT) {
+    return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
+        new TypeParameter<OutputT>() {}, inputT);
+  }
+
+  @VisibleForTesting
+  static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
+      TypeToken<? extends DoFn> fnClass, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
+    checkArgument(
+        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+    checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+
+    TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
+
+    Type[] params = m.getGenericParameterTypes();
+    TypeToken<?> contextToken = null;
+    if (params.length > 0) {
+      contextToken = fnClass.resolveType(params[0]);
+    }
+    checkArgument(
+        contextToken != null && contextToken.equals(processContextToken),
+        "%s must take a %s as its first argument",
+        format(m),
+        formatType(processContextToken));
+
+    List<DoFnSignature.ProcessElementMethod.Parameter> extraParameters = new ArrayList<>();
+    TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
+    TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
+    for (int i = 1; i < params.length; ++i) {
+      TypeToken<?> param = fnClass.resolveType(params[i]);
+      Class<?> rawType = param.getRawType();
+      if (rawType.equals(BoundedWindow.class)) {
+        checkArgument(
+            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW),
+            "Multiple BoundedWindow parameters in %s",
+            format(m));
+        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW);
+      } else if (rawType.equals(DoFn.InputProvider.class)) {
+        checkArgument(
+            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER),
+            "Multiple InputProvider parameters in %s",
+            format(m));
+        checkArgument(
+            param.equals(expectedInputProviderT),
+            "Wrong type of InputProvider parameter for method %s: %s, should be %s",
+            format(m),
+            formatType(param),
+            formatType(expectedInputProviderT));
+        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER);
+      } else if (rawType.equals(DoFn.OutputReceiver.class)) {
+        checkArgument(
+            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER),
+            "Multiple OutputReceiver parameters in %s",
+            format(m));
+        checkArgument(
+            param.equals(expectedOutputReceiverT),
+            "Wrong type of OutputReceiver parameter for method %s: %s, should be %s",
+            format(m),
+            formatType(param),
+            formatType(expectedOutputReceiverT));
+        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER);
+      } else {
+        List<String> allowedParamTypes =
+            Arrays.asList(formatType(new TypeToken<BoundedWindow>() {}));
+        checkArgument(
+            false,
+            "%s is not a valid context parameter for method %s. Should be one of %s",
+            formatType(param),
+            format(m),
+            allowedParamTypes);
+      }
+    }
+
+    return DoFnSignature.ProcessElementMethod.create(m, extraParameters);
+  }
+
+  @VisibleForTesting
+  static DoFnSignature.BundleMethod analyzeBundleMethod(
+      TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
+    checkArgument(
+        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+    checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+
+    TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT);
+
+    Type[] params = m.getGenericParameterTypes();
+    checkArgument(
+        params.length == 1,
+        "%s must have a single argument of type %s",
+        format(m),
+        formatType(expectedContextToken));
+    TypeToken<?> contextToken = fnToken.resolveType(params[0]);
+    checkArgument(
+        contextToken.equals(expectedContextToken),
+        "Wrong type of context argument to %s: %s, must be %s",
+        format(m),
+        formatType(contextToken),
+        formatType(expectedContextToken));
+
+    return DoFnSignature.BundleMethod.create(m);
+  }
+
+  private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(Method m) {
+    checkArgument(
+        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+    checkArgument(
+        m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
+    return DoFnSignature.LifecycleMethod.create(m);
+  }
+
+  private static Collection<Method> declaredMethodsWithAnnotation(
+      Class<? extends Annotation> anno, Class<?> startClass, Class<?> stopClass) {
+    Collection<Method> matches = new ArrayList<>();
+
+    Class<?> clazz = startClass;
+    LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
+
+    // First, find all declared methods on the startClass and parents (up to stopClass)
+    while (clazz != null && !clazz.equals(stopClass)) {
+      for (Method method : clazz.getDeclaredMethods()) {
+        if (method.isAnnotationPresent(anno)) {
+          matches.add(method);
+        }
+      }
+
+      Collections.addAll(interfaces, clazz.getInterfaces());
+
+      clazz = clazz.getSuperclass();
+    }
+
+    // Now, iterate over all the discovered interfaces
+    for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
+      if (method.isAnnotationPresent(anno)) {
+        matches.add(method);
+      }
+    }
+    return matches;
+  }
+
+  private static Method findAnnotatedMethod(
+      Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
+    Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class);
+
+    if (matches.size() == 0) {
+      checkArgument(
+          !required,
+          "No method annotated with @%s found in %s",
+          anno.getSimpleName(),
+          fnClazz.getName());
+      return null;
+    }
+
+    // If we have at least one match, then either it should be the only match
+    // or it should be an extension of the other matches (which came from parent
+    // classes).
+    Method first = matches.iterator().next();
+    for (Method other : matches) {
+      checkArgument(
+          first.getName().equals(other.getName())
+              && Arrays.equals(first.getParameterTypes(), other.getParameterTypes()),
+          "Found multiple methods annotated with @%s. [%s] and [%s]",
+          anno.getSimpleName(),
+          format(first),
+          format(other));
+    }
+
+    // We need to be able to call it. We require it is public.
+    checkArgument(
+        (first.getModifiers() & Modifier.PUBLIC) != 0, "%s must be public", format(first));
+
+    // And make sure its not static.
+    checkArgument(
+        (first.getModifiers() & Modifier.STATIC) == 0, "%s must not be static", format(first));
+
+    return first;
+  }
+
+  private static String format(Method m) {
+    return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
+  }
+
+  private static String formatType(TypeToken<?> t) {
+    return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
new file mode 100644
index 0000000..4df5209
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines reflection-based utilities for analyzing {@link org.apache.beam.sdk.transforms.DoFn}'s
+ * and creating {@link org.apache.beam.sdk.transforms.reflect.DoFnSignature}'s and
+ * {@link org.apache.beam.sdk.transforms.reflect.DoFnInvoker}'s from them.
+ */
+package org.apache.beam.sdk.transforms.reflect;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
deleted file mode 100644
index e05e5e2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ /dev/null
@@ -1,822 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
-import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.UserCodeException;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.lang.reflect.Method;
-
-/**
- * Tests for {@link DoFnReflector}.
- */
-@RunWith(JUnit4.class)
-public class DoFnReflectorTest {
-
-  /**
-   * A convenience struct holding flags that indicate whether a particular method was invoked.
-   */
-  public static class Invocations {
-    public boolean wasProcessElementInvoked = false;
-    public boolean wasStartBundleInvoked = false;
-    public boolean wasFinishBundleInvoked = false;
-    public boolean wasSetupInvoked = false;
-    public boolean wasTeardownInvoked = false;
-    private final String name;
-
-    public Invocations(String name) {
-      this.name = name;
-    }
-  }
-
-  private DoFn<String, String> fn;
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Mock
-  private DoFn<String, String>.ProcessContext mockContext;
-  @Mock
-  private BoundedWindow mockWindow;
-  @Mock
-  private DoFn.InputProvider<String> mockInputProvider;
-  @Mock
-  private DoFn.OutputReceiver<String> mockOutputReceiver;
-
-  private ExtraContextFactory<String, String> extraContextFactory;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-    this.extraContextFactory = new ExtraContextFactory<String, String>() {
-      @Override
-      public BoundedWindow window() {
-        return mockWindow;
-      }
-
-      @Override
-      public DoFn.InputProvider<String> inputProvider() {
-        return mockInputProvider;
-      }
-
-      @Override
-      public DoFn.OutputReceiver<String> outputReceiver() {
-        return mockOutputReceiver;
-      }
-    };
-  }
-
-  private DoFnReflector underTest(DoFn<String, String> fn) {
-    this.fn = fn;
-    return DoFnReflector.of(fn.getClass());
-  }
-
-  private void checkInvokeProcessElementWorks(
-      DoFnReflector r, Invocations... invocations) throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called processElement on " + invocation.name,
-          invocation.wasProcessElementInvoked);
-    }
-    r.bindInvoker(fn).invokeProcessElement(mockContext, extraContextFactory);
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called processElement on " + invocation.name,
-          invocation.wasProcessElementInvoked);
-    }
-  }
-
-  private void checkInvokeStartBundleWorks(
-      DoFnReflector r, Invocations... invocations) throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called startBundle on " + invocation.name,
-          invocation.wasStartBundleInvoked);
-    }
-    r.bindInvoker(fn).invokeStartBundle(mockContext, extraContextFactory);
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called startBundle on " + invocation.name,
-          invocation.wasStartBundleInvoked);
-    }
-  }
-
-  private void checkInvokeFinishBundleWorks(
-      DoFnReflector r, Invocations... invocations) throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called finishBundle on " + invocation.name,
-          invocation.wasFinishBundleInvoked);
-    }
-    r.bindInvoker(fn).invokeFinishBundle(mockContext, extraContextFactory);
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called finishBundle on " + invocation.name,
-          invocation.wasFinishBundleInvoked);
-    }
-  }
-
-  private void checkInvokeSetupWorks(DoFnReflector r, Invocations... invocations) throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called setup on " + invocation.name,
-          invocation.wasSetupInvoked);
-    }
-    r.bindInvoker(fn).invokeSetup();
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called setup on " + invocation.name,
-          invocation.wasSetupInvoked);
-    }
-  }
-
-  private void checkInvokeTeardownWorks(DoFnReflector r, Invocations... invocations)
-      throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called teardown on " + invocation.name,
-          invocation.wasTeardownInvoked);
-    }
-    r.bindInvoker(fn).invokeTeardown();
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called teardown on " + invocation.name,
-          invocation.wasTeardownInvoked);
-    }
-  }
-
-  @Test
-  public void testDoFnWithNoExtraContext() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
-      @ProcessElement
-      public void processElement(ProcessContext c)
-          throws Exception {
-        invocations.wasProcessElementInvoked = true;
-        assertSame(c, mockContext);
-      }
-    });
-
-    assertFalse(reflector.usesSingleWindow());
-
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnInvokersReused() throws Exception {
-    // Ensures that we don't create a new Invoker class for every instance of the OldDoFn.
-    IdentityParent fn1 = new IdentityParent();
-    IdentityParent fn2 = new IdentityParent();
-    DoFnReflector reflector1 = underTest(fn1);
-    DoFnReflector reflector2 = underTest(fn2);
-    assertSame("DoFnReflector instances should be cached and reused for identical types",
-        reflector1, reflector2);
-    assertSame("Invoker classes should only be generated once for each type",
-        reflector1.bindInvoker(fn1).getClass(),
-        reflector2.bindInvoker(fn2).getClass());
-  }
-
-  interface InterfaceWithProcessElement {
-    @ProcessElement
-    void processElement(DoFn<String, String>.ProcessContext c);
-  }
-
-  interface LayersOfInterfaces extends InterfaceWithProcessElement {}
-
-  private class IdentityUsingInterfaceWithProcessElement
-      extends DoFn<String, String>
-      implements LayersOfInterfaces {
-
-    private Invocations invocations = new Invocations("Named Class");
-
-    @Override
-    public void processElement(DoFn<String, String>.ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-      assertSame(c, mockContext);
-    }
-  }
-
-  @Test
-  public void testDoFnWithProcessElementInterface() throws Exception {
-    IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
-    DoFnReflector reflector = underTest(fn);
-    assertFalse(reflector.usesSingleWindow());
-    checkInvokeProcessElementWorks(reflector, fn.invocations);
-  }
-
-  private class IdentityParent extends DoFn<String, String> {
-    protected Invocations parentInvocations = new Invocations("IdentityParent");
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      parentInvocations.wasProcessElementInvoked = true;
-      assertSame(c, mockContext);
-    }
-  }
-
-  private class IdentityChildWithoutOverride extends IdentityParent {
-  }
-
-  private class IdentityChildWithOverride extends IdentityParent {
-    protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
-
-    @Override
-    public void process(DoFn<String, String>.ProcessContext c) {
-      super.process(c);
-      childInvocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  @Test
-  public void testDoFnWithMethodInSuperclass() throws Exception {
-    IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
-    DoFnReflector reflector = underTest(fn);
-    assertFalse(reflector.usesSingleWindow());
-    checkInvokeProcessElementWorks(reflector, fn.parentInvocations);
-  }
-
-  @Test
-  public void testDoFnWithMethodInSubclass() throws Exception {
-    IdentityChildWithOverride fn = new IdentityChildWithOverride();
-    DoFnReflector reflector = underTest(fn);
-    assertFalse(reflector.usesSingleWindow());
-    checkInvokeProcessElementWorks(reflector, fn.parentInvocations, fn.childInvocations);
-  }
-
-  @Test
-  public void testDoFnWithWindow() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
-      @ProcessElement
-      public void processElement(ProcessContext c, BoundedWindow w)
-          throws Exception {
-        invocations.wasProcessElementInvoked = true;
-        assertSame(c, mockContext);
-        assertSame(w, mockWindow);
-      }
-    });
-
-    assertTrue(reflector.usesSingleWindow());
-
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnWithOutputReceiver() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
-      @ProcessElement
-      public void processElement(ProcessContext c, DoFn.OutputReceiver<String> o)
-          throws Exception {
-        invocations.wasProcessElementInvoked = true;
-        assertSame(c, mockContext);
-        assertSame(o, mockOutputReceiver);
-      }
-    });
-
-    assertFalse(reflector.usesSingleWindow());
-
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnWithInputProvider() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
-      @ProcessElement
-      public void processElement(ProcessContext c, DoFn.InputProvider<String> i)
-          throws Exception {
-        invocations.wasProcessElementInvoked = true;
-        assertSame(c, mockContext);
-        assertSame(i, mockInputProvider);
-      }
-    });
-
-    assertFalse(reflector.usesSingleWindow());
-
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnWithStartBundle() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
-      @StartBundle
-      public void startBundle(Context c) {
-        invocations.wasStartBundleInvoked = true;
-        assertSame(c, mockContext);
-      }
-
-      @FinishBundle
-      public void finishBundle(Context c) {
-        invocations.wasFinishBundleInvoked = true;
-        assertSame(c, mockContext);
-      }
-    });
-
-    checkInvokeStartBundleWorks(reflector, invocations);
-    checkInvokeFinishBundleWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnWithSetupTeardown() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
-      @StartBundle
-      public void startBundle(Context c) {
-        invocations.wasStartBundleInvoked = true;
-        assertSame(c, mockContext);
-      }
-
-      @FinishBundle
-      public void finishBundle(Context c) {
-        invocations.wasFinishBundleInvoked = true;
-        assertSame(c, mockContext);
-      }
-
-      @Setup
-      public void before() {
-        invocations.wasSetupInvoked = true;
-      }
-
-      @Teardown
-      public void after() {
-        invocations.wasTeardownInvoked = true;
-      }
-    });
-
-    checkInvokeSetupWorks(reflector, invocations);
-    checkInvokeTeardownWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testNoProcessElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("No method annotated with @ProcessElement found");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {});
-  }
-
-  @Test
-  public void testMultipleProcessElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
-    thrown.expectMessage("foo()");
-    thrown.expectMessage("bar()");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void foo() {}
-
-      @ProcessElement
-      public void bar() {}
-    });
-  }
-
-  @Test
-  public void testMultipleStartBundleElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Found multiple methods annotated with @StartBundle");
-    thrown.expectMessage("bar()");
-    thrown.expectMessage("baz()");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void foo() {}
-
-      @StartBundle
-      public void bar() {}
-
-      @StartBundle
-      public void baz() {}
-    });
-  }
-
-  @Test
-  public void testMultipleFinishBundleElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Found multiple methods annotated with @FinishBundle");
-    thrown.expectMessage("bar()");
-    thrown.expectMessage("baz()");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void foo() {}
-
-      @FinishBundle
-      public void bar() {}
-
-      @FinishBundle
-      public void baz() {}
-    });
-  }
-
-  private static class PrivateDoFnClass extends DoFn<String, String> {
-    final Invocations invocations = new Invocations(getClass().getName());
-
-    @ProcessElement
-    public void processThis(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  @Test
-  public void testLocalPrivateDoFnClass() throws Exception {
-    PrivateDoFnClass fn = new PrivateDoFnClass();
-    DoFnReflector reflector = underTest(fn);
-    checkInvokeProcessElementWorks(reflector, fn.invocations);
-  }
-
-  @Test
-  public void testStaticPackagePrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
-    DoFnReflector reflector =
-        underTest(DoFnReflectorTestHelper.newStaticPackagePrivateDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testInnerPackagePrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
-    DoFnReflector reflector =
-        underTest(new DoFnReflectorTestHelper().newInnerPackagePrivateDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testStaticPrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticPrivateDoFn");
-    DoFnReflector reflector = underTest(DoFnReflectorTestHelper.newStaticPrivateDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testInnerPrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticInnerDoFn");
-    DoFnReflector reflector =
-        underTest(new DoFnReflectorTestHelper().newInnerPrivateDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
-    Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
-    DoFnReflector reflector =
-        underTest(new DoFnReflectorTestHelper().newInnerAnonymousDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
-    Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
-    DoFnReflector reflector =
-        underTest(DoFnReflectorTestHelper.newStaticAnonymousDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testPrivateProcessElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("process() must be public");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      private void process() {}
-    });
-  }
-
-  @Test
-  public void testPrivateStartBundle() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("startBundle() must be public");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement() {}
-
-      @StartBundle
-      void startBundle() {}
-    });
-  }
-
-  @Test
-  public void testPrivateFinishBundle() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("finishBundle() must be public");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement() {}
-
-      @FinishBundle
-      void finishBundle() {}
-    });
-  }
-
-  @SuppressWarnings({"unused"})
-  private void missingProcessContext() {}
-
-  @Test
-  public void testMissingProcessContext() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(getClass().getName()
-        + "#missingProcessContext() must take a ProcessContext as its first argument");
-
-    DoFnReflector.verifyProcessMethodArguments(
-        getClass().getDeclaredMethod("missingProcessContext"));
-  }
-
-  @SuppressWarnings({"unused"})
-  private void badProcessContext(String s) {}
-
-  @Test
-  public void testBadProcessContextType() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(getClass().getName()
-        + "#badProcessContext(String) must take a ProcessContext as its first argument");
-
-    DoFnReflector.verifyProcessMethodArguments(
-        getClass().getDeclaredMethod("badProcessContext", String.class));
-  }
-
-  @SuppressWarnings({"unused"})
-  private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
-
-  @Test
-  public void testBadExtraContext() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "int is not a valid context parameter for method "
-        + getClass().getName() + "#badExtraContext(Context, int). Should be one of [");
-
-    DoFnReflector.verifyBundleMethodArguments(
-        getClass().getDeclaredMethod("badExtraContext", Context.class, int.class));
-  }
-
-  @SuppressWarnings({"unused"})
-  private void badExtraProcessContext(
-      DoFn<Integer, String>.ProcessContext c, Integer n) {}
-
-  @Test
-  public void testBadExtraProcessContextType() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Integer is not a valid context parameter for method "
-        + getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)"
-        + ". Should be one of [BoundedWindow]");
-
-    DoFnReflector.verifyProcessMethodArguments(
-        getClass().getDeclaredMethod("badExtraProcessContext",
-            ProcessContext.class, Integer.class));
-  }
-
-  @SuppressWarnings("unused")
-  private int badReturnType() {
-    return 0;
-  }
-
-  @Test
-  public void testBadReturnType() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void return type");
-
-    DoFnReflector.verifyProcessMethodArguments(getClass().getDeclaredMethod("badReturnType"));
-  }
-
-  @SuppressWarnings("unused")
-  private void goodGenerics(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<Integer> input,
-      DoFn.OutputReceiver<String> output) {}
-
-  @Test
-  public void testValidGenerics() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "goodGenerics",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private void goodWildcards(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<?> input,
-      DoFn.OutputReceiver<?> output) {}
-
-  @Test
-  public void testGoodWildcards() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "goodWildcards",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private void goodBoundedWildcards(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<? super Integer> input,
-      DoFn.OutputReceiver<? super String> output) {}
-
-  @Test
-  public void testGoodBoundedWildcards() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "goodBoundedWildcards",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private <InputT, OutputT> void goodTypeVariables(
-      DoFn<InputT, OutputT>.ProcessContext c,
-      DoFn.InputProvider<InputT> input,
-      DoFn.OutputReceiver<OutputT> output) {}
-
-  @Test
-  public void testGoodTypeVariables() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "goodTypeVariables",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private void badGenericTwoArgs(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<Integer> input,
-      DoFn.OutputReceiver<Integer> output) {}
-
-  @Test
-  public void testBadGenericsTwoArgs() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "badGenericTwoArgs",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Incompatible generics in context parameter "
-        + "OutputReceiver<Integer> "
-        + "for method " + getClass().getName()
-        + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be "
-        + "OutputReceiver<String>");
-
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private void badGenericWildCards(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<Integer> input,
-      DoFn.OutputReceiver<? super Integer> output) {}
-
-  @Test
-  public void testBadGenericWildCards() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "badGenericWildCards",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Incompatible generics in context parameter "
-        + "OutputReceiver<? super Integer> for method "
-        + getClass().getName()
-        + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be "
-        + "OutputReceiver<String>");
-
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c,
-      DoFn.InputProvider<InputT> input, DoFn.OutputReceiver<InputT> output) {}
-
-  @Test
-  public void testBadTypeVariables() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "badTypeVariables",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Incompatible generics in context parameter "
-        + "OutputReceiver<InputT> for method " + getClass().getName()
-        + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be "
-        + "OutputReceiver<OutputT>");
-
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @Test
-  public void testProcessElementException() throws Exception {
-    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {
-        throw new IllegalArgumentException("bogus");
-      }
-    };
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectMessage("bogus");
-    DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeProcessElement(null, null);
-  }
-
-  @Test
-  public void testStartBundleException() throws Exception {
-    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
-      @StartBundle
-      public void startBundle(@SuppressWarnings("unused") Context c) {
-        throw new IllegalArgumentException("bogus");
-      }
-
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {
-      }
-    };
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectMessage("bogus");
-    DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeStartBundle(null, null);
-  }
-
-  @Test
-  public void testFinishBundleException() throws Exception {
-    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
-      @FinishBundle
-      public void finishBundle(@SuppressWarnings("unused") Context c) {
-        throw new IllegalArgumentException("bogus");
-      }
-
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {
-      }
-    };
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectMessage("bogus");
-    DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeFinishBundle(null, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 604536b..3469223 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -99,7 +99,7 @@ public class FlattenTest implements Serializable {
     PCollection<String> output =
         makePCollectionListOfStrings(p, inputs)
         .apply(Flatten.<String>pCollections())
-        .apply(ParDo.of(new IdentityFn<String>(){}));
+        .apply(ParDo.of(new IdentityFn<String>()));
 
     PAssert.that(output).containsInAnyOrder(flattenLists(inputs));
     p.run();
@@ -152,7 +152,7 @@ public class FlattenTest implements Serializable {
     PCollection<String> output =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
-        .apply(ParDo.of(new IdentityFn<String>(){}));
+        .apply(ParDo.of(new IdentityFn<String>()));
 
     PAssert.that(output).empty();
     p.run();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
deleted file mode 100644
index 90fba12..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.dofnreflector;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnReflectorTest.Invocations;
-
-/**
- * Test helper for DoFnReflectorTest, which needs to test package-private access
- * to DoFns in other packages.
- */
-public class DoFnReflectorTestHelper {
-
-  private static class StaticPrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public StaticPrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  private class InnerPrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public InnerPrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  static class StaticPackagePrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public StaticPackagePrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  class InnerPackagePrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public InnerPackagePrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  public static DoFn<String, String> newStaticPackagePrivateDoFn(
-      Invocations invocations) {
-    return new StaticPackagePrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
-    return new InnerPackagePrivateDoFn(invocations);
-  }
-
-  public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
-    return new StaticPrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
-    return new InnerPrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
-    return new DoFn<String, String>() {
-      @ProcessElement
-      public void process(ProcessContext c) {
-        invocations.wasProcessElementInvoked = true;
-      }
-    };
-  }
-
-  public static DoFn<String, String> newStaticAnonymousDoFn(
-      final Invocations invocations) {
-    return new DoFn<String, String>() {
-      @ProcessElement
-      public void process(ProcessContext c) {
-        invocations.wasProcessElementInvoked = true;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
new file mode 100644
index 0000000..7e756e2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.UserCodeException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DoFnInvokers}. */
+public class DoFnInvokersTest {
+  /** A convenience struct holding flags that indicate whether a particular method was invoked. */
+  public static class Invocations {
+    public boolean wasProcessElementInvoked = false;
+    public boolean wasStartBundleInvoked = false;
+    public boolean wasFinishBundleInvoked = false;
+    public boolean wasSetupInvoked = false;
+    public boolean wasTeardownInvoked = false;
+    private final String name;
+
+    public Invocations(String name) {
+      this.name = name;
+    }
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private DoFn.ProcessContext mockContext;
+  @Mock private BoundedWindow mockWindow;
+  @Mock private DoFn.InputProvider<String> mockInputProvider;
+  @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
+
+  private DoFn.ExtraContextFactory<String, String> extraContextFactory;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    this.extraContextFactory =
+        new DoFn.ExtraContextFactory<String, String>() {
+          @Override
+          public BoundedWindow window() {
+            return mockWindow;
+          }
+
+          @Override
+          public DoFn.InputProvider<String> inputProvider() {
+            return mockInputProvider;
+          }
+
+          @Override
+          public DoFn.OutputReceiver<String> outputReceiver() {
+            return mockOutputReceiver;
+          }
+        };
+  }
+
+  private void checkInvokeProcessElementWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called processElement on " + invocation.name,
+          invocation.wasProcessElementInvoked);
+    }
+    DoFnInvokers.INSTANCE
+        .newByteBuddyInvoker(fn)
+        .invokeProcessElement(mockContext, extraContextFactory);
+    for (Invocations invocation : invocations) {
+      assertTrue(
+          "Should have called processElement on " + invocation.name,
+          invocation.wasProcessElementInvoked);
+    }
+  }
+
+  private void checkInvokeStartBundleWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called startBundle on " + invocation.name,
+          invocation.wasStartBundleInvoked);
+    }
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(mockContext);
+    for (Invocations invocation : invocations) {
+      assertTrue(
+          "Should have called startBundle on " + invocation.name, invocation.wasStartBundleInvoked);
+    }
+  }
+
+  private void checkInvokeFinishBundleWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called finishBundle on " + invocation.name,
+          invocation.wasFinishBundleInvoked);
+    }
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(mockContext);
+    for (Invocations invocation : invocations) {
+      assertTrue(
+          "Should have called finishBundle on " + invocation.name,
+          invocation.wasFinishBundleInvoked);
+    }
+  }
+
+  private void checkInvokeSetupWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called setup on " + invocation.name, invocation.wasSetupInvoked);
+    }
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeSetup();
+    for (Invocations invocation : invocations) {
+      assertTrue("Should have called setup on " + invocation.name, invocation.wasSetupInvoked);
+    }
+  }
+
+  private void checkInvokeTeardownWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called teardown on " + invocation.name,
+          invocation.wasTeardownInvoked);
+    }
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeTeardown();
+    for (Invocations invocation : invocations) {
+      assertTrue(
+          "Should have called teardown on " + invocation.name, invocation.wasTeardownInvoked);
+    }
+  }
+
+  @Test
+  public void testDoFnWithNoExtraContext() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            invocations.wasProcessElementInvoked = true;
+            assertSame(c, mockContext);
+          }
+        };
+
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+
+    checkInvokeProcessElementWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnInvokersReused() throws Exception {
+    // Ensures that we don't create a new Invoker class for every instance of the DoFn.
+    IdentityParent fn1 = new IdentityParent();
+    IdentityParent fn2 = new IdentityParent();
+    assertSame(
+        "Invoker classes should only be generated once for each type",
+        DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn1).getClass(),
+        DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass());
+  }
+
+  interface InterfaceWithProcessElement {
+    @DoFn.ProcessElement
+    void processElement(DoFn<String, String>.ProcessContext c);
+  }
+
+  interface LayersOfInterfaces extends InterfaceWithProcessElement {}
+
+  private class IdentityUsingInterfaceWithProcessElement extends DoFn<String, String>
+      implements LayersOfInterfaces {
+
+    private Invocations invocations = new Invocations("Named Class");
+
+    @Override
+    public void processElement(DoFn<String, String>.ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+      assertSame(c, mockContext);
+    }
+  }
+
+  @Test
+  public void testDoFnWithProcessElementInterface() throws Exception {
+    IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+    checkInvokeProcessElementWorks(fn, fn.invocations);
+  }
+
+  private class IdentityParent extends DoFn<String, String> {
+    protected Invocations parentInvocations = new Invocations("IdentityParent");
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      parentInvocations.wasProcessElementInvoked = true;
+      assertSame(c, mockContext);
+    }
+  }
+
+  private class IdentityChildWithoutOverride extends IdentityParent {}
+
+  private class IdentityChildWithOverride extends IdentityParent {
+    protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
+
+    @Override
+    public void process(DoFn<String, String>.ProcessContext c) {
+      super.process(c);
+      childInvocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  @Test
+  public void testDoFnWithMethodInSuperclass() throws Exception {
+    IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+    checkInvokeProcessElementWorks(fn, fn.parentInvocations);
+  }
+
+  @Test
+  public void testDoFnWithMethodInSubclass() throws Exception {
+    IdentityChildWithOverride fn = new IdentityChildWithOverride();
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+    checkInvokeProcessElementWorks(fn, fn.parentInvocations, fn.childInvocations);
+  }
+
+  @Test
+  public void testDoFnWithWindow() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c, BoundedWindow w) throws Exception {
+            invocations.wasProcessElementInvoked = true;
+            assertSame(c, mockContext);
+            assertSame(w, mockWindow);
+          }
+        };
+
+    assertTrue(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+
+    checkInvokeProcessElementWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnWithOutputReceiver() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {
+            invocations.wasProcessElementInvoked = true;
+            assertSame(c, mockContext);
+            assertSame(o, mockOutputReceiver);
+          }
+        };
+
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+
+    checkInvokeProcessElementWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnWithInputProvider() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c, InputProvider<String> i) throws Exception {
+            invocations.wasProcessElementInvoked = true;
+            assertSame(c, mockContext);
+            assertSame(i, mockInputProvider);
+          }
+        };
+
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+
+    checkInvokeProcessElementWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnWithStartBundle() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+
+          @StartBundle
+          public void startBundle(Context c) {
+            invocations.wasStartBundleInvoked = true;
+            assertSame(c, mockContext);
+          }
+
+          @FinishBundle
+          public void finishBundle(Context c) {
+            invocations.wasFinishBundleInvoked = true;
+            assertSame(c, mockContext);
+          }
+        };
+
+    checkInvokeStartBundleWorks(fn, invocations);
+    checkInvokeFinishBundleWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnWithSetupTeardown() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+
+          @StartBundle
+          public void startBundle(Context c) {
+            invocations.wasStartBundleInvoked = true;
+            assertSame(c, mockContext);
+          }
+
+          @FinishBundle
+          public void finishBundle(Context c) {
+            invocations.wasFinishBundleInvoked = true;
+            assertSame(c, mockContext);
+          }
+
+          @Setup
+          public void before() {
+            invocations.wasSetupInvoked = true;
+          }
+
+          @Teardown
+          public void after() {
+            invocations.wasTeardownInvoked = true;
+          }
+        };
+
+    checkInvokeSetupWorks(fn, invocations);
+    checkInvokeTeardownWorks(fn, invocations);
+  }
+
+  private static class PrivateDoFnClass extends DoFn<String, String> {
+    final Invocations invocations = new Invocations(getClass().getName());
+
+    @ProcessElement
+    public void processThis(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  @Test
+  public void testLocalPrivateDoFnClass() throws Exception {
+    PrivateDoFnClass fn = new PrivateDoFnClass();
+    checkInvokeProcessElementWorks(fn, fn.invocations);
+  }
+
+  @Test
+  public void testStaticPackagePrivateDoFnClass() throws Exception {
+    Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
+    checkInvokeProcessElementWorks(
+        DoFnInvokersTestHelper.newStaticPackagePrivateDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testInnerPackagePrivateDoFnClass() throws Exception {
+    Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
+    checkInvokeProcessElementWorks(
+        new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testStaticPrivateDoFnClass() throws Exception {
+    Invocations invocations = new Invocations("StaticPrivateDoFn");
+    checkInvokeProcessElementWorks(
+        DoFnInvokersTestHelper.newStaticPrivateDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testInnerPrivateDoFnClass() throws Exception {
+    Invocations invocations = new Invocations("StaticInnerDoFn");
+    checkInvokeProcessElementWorks(
+        new DoFnInvokersTestHelper().newInnerPrivateDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
+    Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
+    checkInvokeProcessElementWorks(
+        new DoFnInvokersTestHelper().newInnerAnonymousDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
+    Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
+    checkInvokeProcessElementWorks(
+        DoFnInvokersTestHelper.newStaticAnonymousDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testProcessElementException() throws Exception {
+    DoFn<Integer, Integer> fn =
+        new DoFn<Integer, Integer>() {
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {
+            throw new IllegalArgumentException("bogus");
+          }
+        };
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeProcessElement(null, null);
+  }
+
+  @Test
+  public void testStartBundleException() throws Exception {
+    DoFn<Integer, Integer> fn =
+        new DoFn<Integer, Integer>() {
+          @StartBundle
+          public void startBundle(@SuppressWarnings("unused") Context c) {
+            throw new IllegalArgumentException("bogus");
+          }
+
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+        };
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(null);
+  }
+
+  @Test
+  public void testFinishBundleException() throws Exception {
+    DoFn<Integer, Integer> fn =
+        new DoFn<Integer, Integer>() {
+          @FinishBundle
+          public void finishBundle(@SuppressWarnings("unused") Context c) {
+            throw new IllegalArgumentException("bogus");
+          }
+
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+        };
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
new file mode 100644
index 0000000..7bfdddc
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest.Invocations;
+
+/**
+ * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access
+ * to DoFns in other packages.
+ */
+public class DoFnInvokersTestHelper {
+
+  private static class StaticPrivateDoFn extends DoFn<String, String> {
+    final Invocations invocations;
+
+    public StaticPrivateDoFn(Invocations invocations) {
+      this.invocations = invocations;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  private class InnerPrivateDoFn extends DoFn<String, String> {
+    final Invocations invocations;
+
+    public InnerPrivateDoFn(Invocations invocations) {
+      this.invocations = invocations;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  static class StaticPackagePrivateDoFn extends DoFn<String, String> {
+    final Invocations invocations;
+
+    public StaticPackagePrivateDoFn(Invocations invocations) {
+      this.invocations = invocations;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  class InnerPackagePrivateDoFn extends DoFn<String, String> {
+    final Invocations invocations;
+
+    public InnerPackagePrivateDoFn(Invocations invocations) {
+      this.invocations = invocations;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  public static DoFn<String, String> newStaticPackagePrivateDoFn(
+      Invocations invocations) {
+    return new StaticPackagePrivateDoFn(invocations);
+  }
+
+  public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
+    return new InnerPackagePrivateDoFn(invocations);
+  }
+
+  public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
+    return new StaticPrivateDoFn(invocations);
+  }
+
+  public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
+    return new InnerPrivateDoFn(invocations);
+  }
+
+  public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
+    return new DoFn<String, String>() {
+      @ProcessElement
+      public void process(ProcessContext c) {
+        invocations.wasProcessElementInvoked = true;
+      }
+    };
+  }
+
+  public static DoFn<String, String> newStaticAnonymousDoFn(
+      final Invocations invocations) {
+    return new DoFn<String, String>() {
+      @ProcessElement
+      public void process(ProcessContext c) {
+        invocations.wasProcessElementInvoked = true;
+      }
+    };
+  }
+}


[4/4] incubator-beam git commit: Closes #812

Posted by bc...@apache.org.
Closes #812


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89367cfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89367cfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89367cfb

Branch: refs/heads/master
Commit: 89367cfb19ae86d66441970277177512961d3b6a
Parents: 4609773 fbf77f9
Author: bchambers <bc...@google.com>
Authored: Wed Aug 17 15:43:47 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 17 15:43:47 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DoFn.java    |   17 +-
 .../beam/sdk/transforms/DoFnAdapters.java       |  281 +++++
 .../beam/sdk/transforms/DoFnReflector.java      | 1150 ------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  |    2 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |    6 +-
 .../sdk/transforms/reflect/DoFnInvoker.java     |   61 +
 .../sdk/transforms/reflect/DoFnInvokers.java    |  506 ++++++++
 .../sdk/transforms/reflect/DoFnSignature.java   |  113 ++
 .../sdk/transforms/reflect/DoFnSignatures.java  |  321 +++++
 .../sdk/transforms/reflect/package-info.java    |   23 +
 .../beam/sdk/transforms/DoFnReflectorTest.java  |  822 -------------
 .../apache/beam/sdk/transforms/FlattenTest.java |    4 +-
 .../dofnreflector/DoFnReflectorTestHelper.java  |  116 --
 .../transforms/reflect/DoFnInvokersTest.java    |  498 ++++++++
 .../reflect/DoFnInvokersTestHelper.java         |  116 ++
 .../transforms/reflect/DoFnSignaturesTest.java  |  371 ++++++
 .../transforms/DoFnInvokersBenchmark.java       |  224 ++++
 .../transforms/DoFnReflectorBenchmark.java      |  232 ----
 18 files changed, 2529 insertions(+), 2334 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: Rewrites DoFnReflector to go via DoFnSignature

Posted by bc...@apache.org.
Rewrites DoFnReflector to go via DoFnSignature

DoFnSignature encapsulates type information about a DoFn,
in particular which arguments/features its methods
actually use.

Before this commit, DoFnReflector would parse/verify/generate
code in one go; after this commit, these stages are separated:
DoFnSignature encapsulates all information needed to generate
the code.

Additionally, removes the unnecessary genericity in the
implementation of DoFnReflector's code generation for the
very different methods processElement and start/finishBundle.
The code is simpler if decomposed into utility functions,
rather than attempting a uniform representation for different
methods.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fbf77f90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fbf77f90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fbf77f90

Branch: refs/heads/master
Commit: fbf77f90e0391304a580178f99441256526c4b0e
Parents: 4609773
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 9 17:16:00 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 17 15:43:46 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DoFn.java    |   17 +-
 .../beam/sdk/transforms/DoFnAdapters.java       |  281 +++++
 .../beam/sdk/transforms/DoFnReflector.java      | 1150 ------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  |    2 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |    6 +-
 .../sdk/transforms/reflect/DoFnInvoker.java     |   61 +
 .../sdk/transforms/reflect/DoFnInvokers.java    |  506 ++++++++
 .../sdk/transforms/reflect/DoFnSignature.java   |  113 ++
 .../sdk/transforms/reflect/DoFnSignatures.java  |  321 +++++
 .../sdk/transforms/reflect/package-info.java    |   23 +
 .../beam/sdk/transforms/DoFnReflectorTest.java  |  822 -------------
 .../apache/beam/sdk/transforms/FlattenTest.java |    4 +-
 .../dofnreflector/DoFnReflectorTestHelper.java  |  116 --
 .../transforms/reflect/DoFnInvokersTest.java    |  498 ++++++++
 .../reflect/DoFnInvokersTestHelper.java         |  116 ++
 .../transforms/reflect/DoFnSignaturesTest.java  |  371 ++++++
 .../transforms/DoFnInvokersBenchmark.java       |  224 ++++
 .../transforms/DoFnReflectorBenchmark.java      |  232 ----
 18 files changed, 2529 insertions(+), 2334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 80b67af..2348783 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -247,7 +248,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
   /////////////////////////////////////////////////////////////////////////////
 
-  Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
+  protected Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
 
   /**
    * Protects aggregators from being created after initialization.
@@ -283,7 +284,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   /**
    * Interface for runner implementors to provide implementations of extra context information.
    *
-   * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
+   * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking an
    * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
    * has indicated it needs the given extra context.
    *
@@ -301,23 +302,23 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
     BoundedWindow window();
 
     /**
-     * A placeholder for testing purposes. The return type itself is package-private and not
-     * implemented.
+     * A placeholder for testing purposes.
      */
     InputProvider<InputT> inputProvider();
 
     /**
-     * A placeholder for testing purposes. The return type itself is package-private and not
-     * implemented.
+     * A placeholder for testing purposes.
      */
     OutputReceiver<OutputT> outputReceiver();
   }
 
-  static interface OutputReceiver<T> {
+  /** A placeholder for testing handling of output types during {@link DoFn} reflection. */
+  public interface OutputReceiver<T> {
     void output(T output);
   }
 
-  static interface InputProvider<T> {
+  /** A placeholder for testing handling of input types during {@link DoFn} reflection. */
+  public interface InputProvider<T> {
     T get();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
new file mode 100644
index 0000000..71a148f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+
+/**
+ * Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}.
+ *
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using
+ * {@link DoFnInvoker}) rather than via {@link OldDoFn}.
+ */
+@Deprecated
+public class DoFnAdapters {
+  /** Should not be instantiated. */
+  private DoFnAdapters() {}
+
+  /**
+   * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the
+   * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
+   */
+  public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
+    if (fn instanceof SimpleDoFnAdapter) {
+      return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
+    } else {
+      return fn.getClass();
+    }
+  }
+
+  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
+    DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+    if (signature.processElement().usesSingleWindow()) {
+      return new WindowDoFnAdapter<>(fn);
+    } else {
+      return new SimpleDoFnAdapter<>(fn);
+    }
+  }
+
+  /**
+   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
+   * OldDoFn}.
+   */
+  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+    private final DoFn<InputT, OutputT> fn;
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(fn.aggregators);
+      this.fn = fn;
+      this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    }
+
+    @Override
+    public void setup() throws Exception {
+      this.invoker.invokeSetup();
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      this.fn.prepareForProcessing();
+      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void teardown() throws Exception {
+      this.invoker.invokeTeardown();
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
+      invoker.invokeProcessElement(adapter, adapter);
+    }
+
+    @Override
+    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+      return fn.getInputTypeDescriptor();
+    }
+
+    @Override
+    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+      return fn.getOutputTypeDescriptor();
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return fn.getAllowedTimestampSkew();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.include(fn);
+    }
+
+    private void readObject(java.io.ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    }
+  }
+
+  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
+  private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
+      implements OldDoFn.RequiresWindowAccess {
+
+    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(fn);
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ExtraContextFactory} inside a {@link
+   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
+   * unavailable.
+   */
+  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
+      implements DoFn.ExtraContextFactory<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.Context context;
+
+    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+      fn.super();
+      this.context = context;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    public BoundedWindow window() {
+      // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
+      // should be unreachable.
+      throw new UnsupportedOperationException("Can only get the window in ProcessElements");
+    }
+
+    @Override
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ExtraContextFactory} inside a {@link
+   * DoFn.ProcessElement} method.
+   */
+  private static class ProcessContextAdapter<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.ProcessContext
+      implements DoFn.ExtraContextFactory<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.ProcessContext context;
+
+    private ProcessContextAdapter(
+        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
+      fn.super();
+      this.context = context;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return context.sideInput(view);
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    public InputT element() {
+      return context.element();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return context.timestamp();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return context.pane();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return context.window();
+    }
+
+    @Override
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
deleted file mode 100644
index bf04041..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ /dev/null
@@ -1,1150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.StartBundle;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.reflect.TypeParameter;
-import com.google.common.reflect.TypeToken;
-
-import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.NamingStrategy.SuffixingRandom;
-import net.bytebuddy.description.field.FieldDescription;
-import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.description.method.ParameterList;
-import net.bytebuddy.description.modifier.FieldManifestation;
-import net.bytebuddy.description.modifier.Visibility;
-import net.bytebuddy.description.type.TypeDescription;
-import net.bytebuddy.description.type.TypeDescription.Generic;
-import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
-import net.bytebuddy.dynamic.scaffold.InstrumentedType;
-import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy.Default;
-import net.bytebuddy.implementation.Implementation;
-import net.bytebuddy.implementation.MethodCall.MethodLocator;
-import net.bytebuddy.implementation.StubMethod;
-import net.bytebuddy.implementation.bind.MethodDelegationBinder.MethodInvoker;
-import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder.TerminationHandler;
-import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
-import net.bytebuddy.implementation.bytecode.Duplication;
-import net.bytebuddy.implementation.bytecode.StackManipulation;
-import net.bytebuddy.implementation.bytecode.Throw;
-import net.bytebuddy.implementation.bytecode.assign.Assigner;
-import net.bytebuddy.implementation.bytecode.member.FieldAccess;
-import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
-import net.bytebuddy.implementation.bytecode.member.MethodReturn;
-import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
-import net.bytebuddy.jar.asm.Label;
-import net.bytebuddy.jar.asm.MethodVisitor;
-import net.bytebuddy.jar.asm.Opcodes;
-import net.bytebuddy.matcher.ElementMatchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-
-/**
- * Utility implementing the necessary reflection for working with {@link DoFn}s.
- */
-public abstract class DoFnReflector {
-
-  private static final String FN_DELEGATE_FIELD_NAME = "delegate";
-
-  private enum Availability {
-    /** Indicates parameters only available in {@code @ProcessElement} methods. */
-    PROCESS_ELEMENT_ONLY,
-    /** Indicates parameters available in all methods. */
-    EVERYWHERE
-  }
-
-  /**
-   * Enumeration of the parameters available from the {@link ExtraContextFactory} to use as
-   * additional parameters for {@link DoFn} methods.
-   * <p>
-   * We don't rely on looking for properly annotated methods within {@link ExtraContextFactory}
-   * because erasure would make it impossible to completely fill in the type token for context
-   * parameters that depend on the input/output type.
-   */
-  private enum AdditionalParameter {
-
-    /** Any {@link BoundedWindow} parameter is populated by the window of the current element. */
-    WINDOW_OF_ELEMENT(Availability.PROCESS_ELEMENT_ONLY, BoundedWindow.class, "window") {
-      @Override
-      public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
-        return TypeToken.of(BoundedWindow.class);
-      }
-    },
-
-    INPUT_PROVIDER(Availability.PROCESS_ELEMENT_ONLY, DoFn.InputProvider.class, "inputProvider") {
-      @Override
-      public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
-        return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
-            new TypeParameter<InputT>() {}, in);
-      }
-
-      @Override
-      public boolean isHidden() {
-        return true;
-      }
-    },
-
-    OUTPUT_RECEIVER(
-        Availability.PROCESS_ELEMENT_ONLY, DoFn.OutputReceiver.class, "outputReceiver") {
-      @Override
-      public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
-        return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
-            new TypeParameter<OutputT>() {}, out);
-      }
-
-      @Override
-      public boolean isHidden() {
-        return true;
-      }
-    };
-
-    /**
-     * Create a type token representing the given parameter. May use the type token associated
-     * with the input and output types of the {@link DoFn}, depending on the extra
-     * context.
-     */
-    abstract <InputT, OutputT> TypeToken<?> tokenFor(
-        TypeToken<InputT> in, TypeToken<OutputT> out);
-
-    /**
-     * Indicates whether this enum is for testing only, hence should not appear in error messages,
-     * etc. Defaults to {@code false}.
-     */
-    boolean isHidden() {
-      return false;
-    }
-
-    private final Class<?> rawType;
-    private final Availability availability;
-    private final transient MethodDescription method;
-
-    private AdditionalParameter(Availability availability, Class<?> rawType, String method) {
-      this.availability = availability;
-      this.rawType = rawType;
-      try {
-        this.method = new MethodDescription.ForLoadedMethod(
-            ExtraContextFactory.class.getMethod(method));
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new RuntimeException(
-            "Unable to access method " + method + " on " + ExtraContextFactory.class, e);
-      }
-    }
-  }
-
-  private static final Map<Class<?>, AdditionalParameter> EXTRA_CONTEXTS;
-  private static final Map<Class<?>, AdditionalParameter> EXTRA_PROCESS_CONTEXTS;
-
-  static {
-    ImmutableMap.Builder<Class<?>, AdditionalParameter> everywhereBuilder =
-        ImmutableMap.<Class<?>, AdditionalParameter>builder();
-    ImmutableMap.Builder<Class<?>, AdditionalParameter> processElementBuilder =
-        ImmutableMap.<Class<?>, AdditionalParameter>builder();
-
-    for (AdditionalParameter value : AdditionalParameter.values()) {
-      switch (value.availability) {
-        case EVERYWHERE:
-          everywhereBuilder.put(value.rawType, value);
-          break;
-        case PROCESS_ELEMENT_ONLY:
-          processElementBuilder.put(value.rawType, value);
-          break;
-      }
-    }
-
-    EXTRA_CONTEXTS = everywhereBuilder.build();
-    EXTRA_PROCESS_CONTEXTS = processElementBuilder
-        // Process Element contexts include everything available everywhere
-        .putAll(EXTRA_CONTEXTS)
-        .build();
-  }
-
-  /**
-   * @return true if the reflected {@link DoFn} uses a Single Window.
-   */
-  public abstract boolean usesSingleWindow();
-
-  /** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */
-  public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
-      DoFn<InputT, OutputT> fn);
-
-  private static final Map<Class<?>, DoFnReflector> REFLECTOR_CACHE =
-      new LinkedHashMap<Class<?>, DoFnReflector>();
-
-  /**
-   * @return the {@link DoFnReflector} for the given {@link DoFn}.
-   */
-  public static DoFnReflector of(
-      @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
-    DoFnReflector reflector = REFLECTOR_CACHE.get(fn);
-    if (reflector != null) {
-      return reflector;
-    }
-
-    reflector = new GenericDoFnReflector(fn);
-    REFLECTOR_CACHE.put(fn, reflector);
-    return reflector;
-  }
-
-  /**
-   * Create a {@link OldDoFn} that the {@link DoFn}.
-   */
-  public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFn<InputT, OutputT> fn) {
-    if (usesSingleWindow()) {
-      return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
-    } else {
-      return new SimpleDoFnAdapter<InputT, OutputT>(this, fn);
-    }
-  }
-
-  private static String formatType(TypeToken<?> t) {
-    return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
-  }
-
-  private static String format(Method m) {
-    return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
-  }
-
-  private static Collection<String> describeSupportedTypes(
-      Map<Class<?>, AdditionalParameter> extraProcessContexts,
-      final TypeToken<?> in, final TypeToken<?> out) {
-    return FluentIterable
-        .from(extraProcessContexts.values())
-        .filter(new Predicate<AdditionalParameter>() {
-          @Override
-          public boolean apply(@Nonnull AdditionalParameter additionalParameter) {
-            return !additionalParameter.isHidden();
-          }
-        })
-        .transform(new Function<AdditionalParameter, String>() {
-          @Override
-          @Nonnull
-          public String apply(@Nonnull AdditionalParameter input) {
-            return formatType(input.tokenFor(in, out));
-          }
-        })
-        .toSortedSet(String.CASE_INSENSITIVE_ORDER);
-  }
-
-  @VisibleForTesting
-  static <InputT, OutputT> List<AdditionalParameter> verifyProcessMethodArguments(Method m) {
-    return verifyMethodArguments(m,
-        EXTRA_PROCESS_CONTEXTS,
-        new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {},
-        new TypeParameter<InputT>() {},
-        new TypeParameter<OutputT>() {});
-  }
-
-  @VisibleForTesting
-  static <InputT, OutputT> List<AdditionalParameter> verifyBundleMethodArguments(Method m) {
-    if (m == null) {
-      return null;
-    }
-    return verifyMethodArguments(m,
-        EXTRA_CONTEXTS,
-        new TypeToken<DoFn<InputT, OutputT>.Context>() {},
-        new TypeParameter<InputT>() {},
-        new TypeParameter<OutputT>() {});
-  }
-
-  @VisibleForTesting
-  static void verifyLifecycleMethodArguments(Method m) {
-    if (m == null) {
-      return;
-    }
-    checkState(void.class.equals(m.getReturnType()), "%s must have void return type", format(m));
-    checkState(m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
-  }
-
-  /**
-   * Verify the method arguments for a given {@link DoFn} method.
-   *
-   * <p>The requirements for a method to be valid, are:
-   * <ol>
-   * <li>The method has at least one argument.
-   * <li>The first argument is of type firstContextArg.
-   * <li>The remaining arguments have raw types that appear in {@code contexts}
-   * <li>Any generics on the extra context arguments match what is expected. Currently, this
-   * is exercised only by placeholders. For example, {@code InputReceiver<InputT> must either match
-   * the {@code InputT} {@code OldDoFn<InputT, OutputT>.ProcessContext} or use a wildcard, etc.
-   * </ol>
-   *
-   * @param m the method to verify
-   * @param contexts mapping from raw classes to the {@link AdditionalParameter} used
-   *     to create new instances.
-   * @param firstContextArg the expected type of the first context argument
-   * @param iParam TypeParameter representing the input type
-   * @param oParam TypeParameter representing the output type
-   */
-  @VisibleForTesting
-  static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments(
-      Method m,
-      Map<Class<?>, AdditionalParameter> contexts,
-      TypeToken<?> firstContextArg,
-      TypeParameter<InputT> iParam,
-      TypeParameter<OutputT> oParam) {
-
-    if (!void.class.equals(m.getReturnType())) {
-      throw new IllegalStateException(String.format(
-          "%s must have a void return type", format(m)));
-    }
-    if (m.isVarArgs()) {
-      throw new IllegalStateException(String.format(
-          "%s must not have var args", format(m)));
-    }
-
-    // The first parameter must be present, and must be the specified type
-    Type[] params = m.getGenericParameterTypes();
-    TypeToken<?> contextToken = null;
-    if (params.length > 0) {
-      contextToken = TypeToken.of(params[0]);
-    }
-    if (contextToken == null
-        || !contextToken.getRawType().equals(firstContextArg.getRawType())) {
-      throw new IllegalStateException(String.format(
-          "%s must take a %s as its first argument",
-          format(m), firstContextArg.getRawType().getSimpleName()));
-    }
-    AdditionalParameter[] contextInfos = new AdditionalParameter[params.length - 1];
-
-    // Fill in the generics in the allExtraContextArgs interface from the types in the
-    // Context or ProcessContext OldDoFn.
-    ParameterizedType pt = (ParameterizedType) contextToken.getType();
-    // We actually want the owner, since ProcessContext and Context are owned by DoFn.
-    pt = (ParameterizedType) pt.getOwnerType();
-    @SuppressWarnings("unchecked")
-    TypeToken<InputT> iActual = (TypeToken<InputT>) TypeToken.of(pt.getActualTypeArguments()[0]);
-    @SuppressWarnings("unchecked")
-    TypeToken<OutputT> oActual = (TypeToken<OutputT>) TypeToken.of(pt.getActualTypeArguments()[1]);
-
-    // All of the remaining parameters must be a super-interface of allExtraContextArgs
-    // that is not listed in the EXCLUDED_INTERFACES set.
-    for (int i = 1; i < params.length; i++) {
-      TypeToken<?> param = TypeToken.of(params[i]);
-
-      AdditionalParameter info = contexts.get(param.getRawType());
-      if (info == null) {
-        throw new IllegalStateException(String.format(
-            "%s is not a valid context parameter for method %s. Should be one of %s",
-            formatType(param), format(m),
-            describeSupportedTypes(contexts, iActual, oActual)));
-      }
-
-      // If we get here, the class matches, but maybe the generics don't:
-      TypeToken<?> expected = info.tokenFor(iActual, oActual);
-      if (!expected.isSubtypeOf(param)) {
-        throw new IllegalStateException(String.format(
-            "Incompatible generics in context parameter %s for method %s. Should be %s",
-            formatType(param), format(m), formatType(info.tokenFor(iActual, oActual))));
-      }
-
-      // Register the (now validated) context info
-      contextInfos[i - 1] = info;
-    }
-    return ImmutableList.copyOf(contextInfos);
-  }
-
-  /** Interface for invoking the {@code OldDoFn} processing methods. */
-  public interface DoFnInvoker<InputT, OutputT>  {
-    /** Invoke {@link OldDoFn#setup} on the bound {@code OldDoFn}. */
-    void invokeSetup();
-    /** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */
-    void invokeStartBundle(
-        DoFn<InputT, OutputT>.Context c,
-        ExtraContextFactory<InputT, OutputT> extra);
-    /** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */
-    void invokeFinishBundle(
-        DoFn<InputT, OutputT>.Context c,
-        ExtraContextFactory<InputT, OutputT> extra);
-
-    /** Invoke {@link OldDoFn#teardown()} on the bound {@code DoFn}. */
-    void invokeTeardown();
-
-    /** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */
-    public void invokeProcessElement(
-        DoFn<InputT, OutputT>.ProcessContext c,
-        ExtraContextFactory<InputT, OutputT> extra);
-  }
-
-  /**
-   * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFn}.
-   */
-  private static class GenericDoFnReflector extends DoFnReflector {
-
-    private final Method setup;
-    private final Method startBundle;
-    private final Method processElement;
-    private final Method finishBundle;
-    private final Method teardown;
-    private final List<AdditionalParameter> processElementArgs;
-    private final List<AdditionalParameter> startBundleArgs;
-    private final List<AdditionalParameter> finishBundleArgs;
-    private final Constructor<?> constructor;
-
-    private GenericDoFnReflector(
-        @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
-      // Locate the annotated methods
-      this.processElement = findAnnotatedMethod(ProcessElement.class, fn, true);
-      this.setup = findAnnotatedMethod(Setup.class, fn, false);
-      this.startBundle = findAnnotatedMethod(StartBundle.class, fn, false);
-      this.finishBundle = findAnnotatedMethod(FinishBundle.class, fn, false);
-      this.teardown = findAnnotatedMethod(Teardown.class, fn, false);
-
-      // Verify that their method arguments satisfy our conditions.
-      this.processElementArgs = verifyProcessMethodArguments(processElement);
-      this.startBundleArgs = verifyBundleMethodArguments(startBundle);
-      this.finishBundleArgs = verifyBundleMethodArguments(finishBundle);
-      verifyLifecycleMethodArguments(setup);
-      verifyLifecycleMethodArguments(teardown);
-
-      this.constructor = createInvokerConstructor(fn);
-    }
-
-    private static Collection<Method> declaredMethodsWithAnnotation(
-        Class<? extends Annotation> anno,
-        Class<?> startClass, Class<?> stopClass) {
-      Collection<Method> matches = new ArrayList<>();
-
-      Class<?> clazz = startClass;
-      LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
-
-      // First, find all declared methods on the startClass and parents (up to stopClass)
-      while (clazz != null && !clazz.equals(stopClass)) {
-        for (Method method : clazz.getDeclaredMethods()) {
-          if (method.isAnnotationPresent(anno)) {
-            matches.add(method);
-          }
-        }
-
-        Collections.addAll(interfaces, clazz.getInterfaces());
-
-        clazz = clazz.getSuperclass();
-      }
-
-      // Now, iterate over all the discovered interfaces
-      for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
-        if (method.isAnnotationPresent(anno)) {
-          matches.add(method);
-        }
-      }
-      return matches;
-    }
-
-    private static Method findAnnotatedMethod(
-        Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
-      Collection<Method> matches = declaredMethodsWithAnnotation(
-          anno, fnClazz, DoFn.class);
-
-      if (matches.size() == 0) {
-        if (required) {
-          throw new IllegalStateException(String.format(
-              "No method annotated with @%s found in %s",
-              anno.getSimpleName(), fnClazz.getName()));
-        } else {
-          return null;
-        }
-      }
-
-      // If we have at least one match, then either it should be the only match
-      // or it should be an extension of the other matches (which came from parent
-      // classes).
-      Method first = matches.iterator().next();
-      for (Method other : matches) {
-        if (!first.getName().equals(other.getName())
-            || !Arrays.equals(first.getParameterTypes(), other.getParameterTypes())) {
-          throw new IllegalStateException(String.format(
-              "Found multiple methods annotated with @%s. [%s] and [%s]",
-              anno.getSimpleName(), format(first), format(other)));
-        }
-      }
-
-      // We need to be able to call it. We require it is public.
-      if ((first.getModifiers() & Modifier.PUBLIC) == 0) {
-        throw new IllegalStateException(format(first) + " must be public");
-      }
-
-      // And make sure its not static.
-      if ((first.getModifiers() & Modifier.STATIC) != 0) {
-        throw new IllegalStateException(format(first) + " must not be static");
-      }
-
-      return first;
-    }
-
-    @Override
-    public boolean usesSingleWindow() {
-      return usesContext(AdditionalParameter.WINDOW_OF_ELEMENT);
-    }
-
-    private boolean usesContext(AdditionalParameter param) {
-      return processElementArgs.contains(param)
-          || (startBundleArgs != null && startBundleArgs.contains(param))
-          || (finishBundleArgs != null && finishBundleArgs.contains(param));
-    }
-
-    /**
-     * Use ByteBuddy to generate the code for a {@link DoFnInvoker} that invokes the given
-     * {@link DoFn}.
-     * @param clazz
-     * @return
-     */
-    private Constructor<? extends DoFnInvoker<?, ?>> createInvokerConstructor(
-        @SuppressWarnings("rawtypes") Class<? extends DoFn> clazz) {
-
-      final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(clazz);
-
-      DynamicType.Builder<?> builder = new ByteBuddy()
-          // Create subclasses inside the target class, to have access to
-          // private and package-private bits
-          .with(new SuffixingRandom("auxiliary") {
-                @Override
-                public String subclass(Generic superClass) {
-                  return super.name(clazzDescription);
-                }
-              })
-          // Create a subclass of DoFnInvoker
-          .subclass(DoFnInvoker.class, Default.NO_CONSTRUCTORS)
-          .defineField(FN_DELEGATE_FIELD_NAME, clazz, Visibility.PRIVATE, FieldManifestation.FINAL)
-          // Define a constructor to populate fields appropriately.
-          .defineConstructor(Visibility.PUBLIC)
-          .withParameter(clazz)
-          .intercept(new InvokerConstructor())
-          // Implement the three methods by calling into the appropriate functions on the fn.
-          .method(ElementMatchers.named("invokeProcessElement"))
-          .intercept(InvokerDelegation.create(
-              processElement, BeforeDelegation.NOOP, processElementArgs))
-          .method(ElementMatchers.named("invokeStartBundle"))
-          .intercept(InvokerDelegation.create(
-              startBundle, BeforeDelegation.INVOKE_PREPARE_FOR_PROCESSING, startBundleArgs))
-          .method(ElementMatchers.named("invokeFinishBundle"))
-          .intercept(InvokerDelegation.create(finishBundle,
-              BeforeDelegation.NOOP,
-              finishBundleArgs))
-          .method(ElementMatchers.named("invokeSetup"))
-          .intercept(InvokerDelegation.create(setup,
-              BeforeDelegation.NOOP,
-              Collections.<AdditionalParameter>emptyList()))
-          .method(ElementMatchers.named("invokeTeardown"))
-          .intercept(InvokerDelegation.create(teardown,
-              BeforeDelegation.NOOP,
-              Collections.<AdditionalParameter>emptyList()));
-
-      @SuppressWarnings("unchecked")
-      Class<? extends DoFnInvoker<?, ?>> dynamicClass = (Class<? extends DoFnInvoker<?, ?>>) builder
-          .make()
-          .load(getClass().getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
-          .getLoaded();
-      try {
-        return dynamicClass.getConstructor(clazz);
-      } catch (IllegalArgumentException
-          | NoSuchMethodException
-          | SecurityException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
-        DoFn<InputT, OutputT> fn) {
-      try {
-        @SuppressWarnings("unchecked")
-        DoFnInvoker<InputT, OutputT> invoker =
-            (DoFnInvoker<InputT, OutputT>) constructor.newInstance(fn);
-        return invoker;
-      } catch (InstantiationException
-          | IllegalAccessException
-          | IllegalArgumentException
-          | InvocationTargetException
-          | SecurityException e) {
-        throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e);
-      }
-    }
-  }
-
-  private static class ContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.Context
-      implements DoFn.ExtraContextFactory<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.Context context;
-
-    private ContextAdapter(
-        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException("Can only get the window in ProcessElements");
-    }
-
-    @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
-    }
-  }
-
-  private static class ProcessContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext
-      implements DoFn.ExtraContextFactory<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.ProcessContext context;
-
-    private ProcessContextAdapter(
-        DoFn<InputT, OutputT> fn,
-        OldDoFn<InputT, OutputT>.ProcessContext context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return context.sideInput(view);
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    public InputT element() {
-      return context.element();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return context.timestamp();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return context.pane();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return context.window();
-    }
-
-    @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
-    }
-  }
-
-  public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
-    } else {
-      return fn.getClass();
-    }
-  }
-
-  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-
-    private final DoFn<InputT, OutputT> fn;
-    private transient DoFnInvoker<InputT, OutputT> invoker;
-
-    private SimpleDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
-      super(fn.aggregators);
-      this.fn = fn;
-      this.invoker = reflector.bindInvoker(fn);
-    }
-
-    @Override
-    public void setup() throws Exception {
-      invoker.invokeSetup();
-    }
-
-    @Override
-    public void startBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
-      ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
-      invoker.invokeStartBundle(adapter, adapter);
-    }
-
-    @Override
-    public void finishBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
-      ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
-      invoker.invokeFinishBundle(adapter, adapter);
-    }
-
-    @Override
-    public void teardown() {
-      invoker.invokeTeardown();
-    }
-
-    @Override
-    public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
-      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
-      invoker.invokeProcessElement(adapter, adapter);
-    }
-
-    @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return fn.getInputTypeDescriptor();
-    }
-
-    @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return fn.getOutputTypeDescriptor();
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return fn.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.include(fn);
-    }
-
-    private void readObject(java.io.ObjectInputStream in)
-        throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      invoker = DoFnReflector.of(fn.getClass()).bindInvoker(fn);
-    }
-  }
-
-  private static class WindowDoFnAdapter<InputT, OutputT>
-  extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess {
-
-    private WindowDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
-      super(reflector, fn);
-    }
-  }
-
-  private static enum BeforeDelegation {
-    NOOP {
-      @Override
-      StackManipulation manipulation(
-          TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep) {
-        Preconditions.checkArgument(!finalStep,
-            "Shouldn't use NOOP delegation if there is nothing to do afterwards.");
-        return StackManipulation.Trivial.INSTANCE;
-      }
-    },
-    INVOKE_PREPARE_FOR_PROCESSING {
-      private final Assigner assigner = Assigner.DEFAULT;
-
-      @Override
-      StackManipulation manipulation(
-          TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep) {
-        MethodDescription prepareMethod;
-        try {
-          prepareMethod = new MethodLocator.ForExplicitMethod(
-              new MethodDescription.ForLoadedMethod(
-                  DoFn.class.getDeclaredMethod("prepareForProcessing")))
-          .resolve(instrumentedMethod);
-        } catch (NoSuchMethodException | SecurityException e) {
-          throw new RuntimeException("Unable to locate prepareForProcessing method", e);
-        }
-
-        if (finalStep) {
-          return new StackManipulation.Compound(
-              // Invoke the prepare method
-              MethodInvoker.Simple.INSTANCE.invoke(prepareMethod),
-              // Return from the invokeStartBundle when we're done.
-              TerminationHandler.Returning.INSTANCE.resolve(
-                  assigner, instrumentedMethod, prepareMethod));
-        } else {
-          return new StackManipulation.Compound(
-              // Duplicate the delegation target so that it remains after we invoke prepare
-              Duplication.duplicate(delegateType),
-              // Invoke the prepare method
-              MethodInvoker.Simple.INSTANCE.invoke(prepareMethod),
-              // Drop the return value from prepareForProcessing
-              TerminationHandler.Dropping.INSTANCE.resolve(
-                  assigner, instrumentedMethod, prepareMethod));
-        }
-      }
-    };
-
-    /**
-     * Stack manipulation to perform prior to the delegate call.
-     *
-     * <ul>
-     * <li>Precondition: Stack has the delegate target on top of the stack
-     * <li>Postcondition: If finalStep is true, then we've returned from the method. Otherwise, the
-     * stack still has the delegate target on top of the stack.
-     * </ul>
-     *
-     * @param delegateType The type of the delegate target, in case it needs to be duplicated.
-     * @param instrumentedMethod The method bing instrumented. Necessary for resolving types and
-     *     other information.
-     * @param finalStep If true, return from the {@code invokeStartBundle} method after invoking
-     * {@code prepareForProcessing} on the delegate.
-     */
-    abstract StackManipulation manipulation(
-        TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep);
-  }
-
-  /**
-   * A byte-buddy {@link Implementation} that delegates a call that receives
-   * {@link AdditionalParameter} to the given {@link DoFn} method.
-   */
-  private static final class InvokerDelegation implements Implementation {
-    @Nullable
-    private final Method target;
-    private final BeforeDelegation before;
-    private final List<AdditionalParameter> args;
-    private final Assigner assigner = Assigner.DEFAULT;
-    private FieldDescription field;
-
-    /**
-     * Create the {@link InvokerDelegation} for the specified method.
-     *
-     * @param target the method to delegate to
-     * @param isStartBundle whether or not this is the {@code startBundle} call
-     * @param args the {@link AdditionalParameter} to be passed to the {@code target}
-     */
-    private InvokerDelegation(
-        @Nullable Method target,
-        BeforeDelegation before,
-        List<AdditionalParameter> args) {
-      this.target = target;
-      this.before = before;
-      this.args = args;
-    }
-
-    /**
-     * Generate the {@link Implementation} of one of the life-cycle methods of a
-     * {@link DoFn}.
-     */
-    private static Implementation create(
-        @Nullable final Method target, BeforeDelegation before, List<AdditionalParameter> args) {
-      if (target == null && before == BeforeDelegation.NOOP) {
-        // There is no target to call and nothing needs to happen before. Just produce a stub.
-        return StubMethod.INSTANCE;
-      } else {
-        // We need to generate a non-empty method implementation.
-        return new InvokerDelegation(target, before, args);
-      }
-    }
-
-    @Override
-    public InstrumentedType prepare(InstrumentedType instrumentedType) {
-      // Remember the field description of the instrumented type.
-      field = instrumentedType.getDeclaredFields()
-          .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)).getOnly();
-
-      // Delegating the method call doesn't require any changes to the instrumented type.
-      return instrumentedType;
-    }
-
-    /**
-     * Stack manipulation to push the {@link DoFn} reference stored in the
-     * delegate field of the invoker on to the top of the stack.
-     *
-     * <p>This implementation is derived from the code for
-     * {@code MethodCall.invoke(m).onInstanceField(clazz, delegateField)} with two key differences.
-     * First, it doesn't add a synthetic field each time, which is critical to avoid duplicate field
-     * definitions. Second, it uses the {@link AdditionalParameter} to populate the arguments to the
-     * method.
-     */
-    private StackManipulation pushDelegateField() {
-      return new StackManipulation.Compound(
-          // Push "this" reference to the stack
-          MethodVariableAccess.REFERENCE.loadOffset(0),
-          // Access the delegate field of the the invoker
-          FieldAccess.forField(field).getter());
-    }
-
-    private StackManipulation pushArgument(
-        AdditionalParameter arg, MethodDescription instrumentedMethod) {
-      MethodDescription transform = arg.method;
-
-      return new StackManipulation.Compound(
-          // Push the ExtraContextFactory which must have been argument 2 of the instrumented method
-          MethodVariableAccess.REFERENCE.loadOffset(2),
-          // Invoke the appropriate method to produce the context argument
-          MethodInvocation.invoke(transform));
-    }
-
-    private StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
-      MethodDescription targetMethod = new MethodLocator.ForExplicitMethod(
-          new MethodDescription.ForLoadedMethod(target)).resolve(instrumentedMethod);
-      ParameterList<?> params = targetMethod.getParameters();
-
-      List<StackManipulation> parameters;
-      if (!params.isEmpty()) {
-        // Instructions to setup the parameters for the call
-        parameters = new ArrayList<>(args.size() + 1);
-        // 1. The first argument in the delegate method must be the context. This corresponds to
-        //    the first argument in the instrumented method, so copy that.
-        parameters.add(MethodVariableAccess.of(params.get(0).getType().getSuperClass())
-            .loadOffset(1));
-        // 2. For each of the extra arguments push the appropriate value.
-        for (AdditionalParameter arg : args) {
-          parameters.add(pushArgument(arg, instrumentedMethod));
-        }
-      } else {
-        parameters = Collections.emptyList();
-      }
-
-      return new StackManipulation.Compound(
-          // Push the parameters
-          new StackManipulation.Compound(parameters),
-          // Invoke the target method
-          wrapWithUserCodeException(MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
-          // Return from the instrumented method
-          TerminationHandler.Returning.INSTANCE.resolve(
-              assigner, instrumentedMethod, targetMethod));
-    }
-
-    /**
-     * Wrap a given stack manipulation in a try catch block. Any exceptions thrown within the
-     * try are wrapped with a {@link UserCodeException}.
-     */
-    private StackManipulation wrapWithUserCodeException(
-        final StackManipulation tryBody) {
-      final MethodDescription createUserCodeException;
-      try {
-        createUserCodeException = new MethodDescription.ForLoadedMethod(
-                UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new RuntimeException("Unable to find UserCodeException.wrap", e);
-      }
-
-      return new StackManipulation() {
-        @Override
-        public boolean isValid() {
-          return tryBody.isValid();
-        }
-
-        @Override
-        public Size apply(MethodVisitor mv, Context implementationContext) {
-          Label tryBlockStart = new Label();
-          Label tryBlockEnd = new Label();
-          Label catchBlockStart = new Label();
-          Label catchBlockEnd = new Label();
-
-          String throwableName =
-              new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
-          mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
-
-          // The try block attempts to perform the expected operations, then jumps to success
-          mv.visitLabel(tryBlockStart);
-          Size trySize = tryBody.apply(mv, implementationContext);
-          mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
-          mv.visitLabel(tryBlockEnd);
-
-          // The handler wraps the exception, and then throws.
-          mv.visitLabel(catchBlockStart);
-          // Add the exception to the frame
-          mv.visitFrame(Opcodes.F_SAME1,
-              // No local variables
-              0, new Object[] {},
-              // 1 stack element (the throwable)
-              1, new Object[] { throwableName });
-
-          Size catchSize = new StackManipulation.Compound(
-              MethodInvocation.invoke(createUserCodeException),
-              Throw.INSTANCE)
-              .apply(mv, implementationContext);
-
-          mv.visitLabel(catchBlockEnd);
-          // The frame contents after the try/catch block is the same
-          // as it was before.
-          mv.visitFrame(Opcodes.F_SAME,
-              // No local variables
-              0, new Object[] {},
-              // No new stack variables
-              0, new Object[] {});
-
-          return new Size(
-              trySize.getSizeImpact(),
-              Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize()));
-        }
-      };
-    }
-
-    @Override
-    public ByteCodeAppender appender(final Target implementationTarget) {
-      return new ByteCodeAppender() {
-        @Override
-        public Size apply(
-            MethodVisitor methodVisitor,
-            Context implementationContext,
-            MethodDescription instrumentedMethod) {
-          StackManipulation.Size size = new StackManipulation.Compound(
-              // Put the target on top of the stack
-              pushDelegateField(),
-              // Do any necessary pre-delegation work
-              before.manipulation(field.getType().asErasure(), instrumentedMethod, target == null),
-              // Invoke the target method, if there is one. If there wasn't, then isStartBundle was
-              // true, and we've already emitted the appropriate return instructions.
-              target != null
-                  ? invokeTargetMethod(instrumentedMethod)
-                  : StackManipulation.Trivial.INSTANCE)
-              .apply(methodVisitor, implementationContext);
-          return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
-        }
-      };
-    }
-  }
-
-  /**
-   * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
-   * for a constructor that takes a single argument and assigns it to the delegate field.
-   * {@link AdditionalParameter} to the given {@link DoFn} method.
-   */
-  private static final class InvokerConstructor implements Implementation {
-    @Override
-    public InstrumentedType prepare(InstrumentedType instrumentedType) {
-      return instrumentedType;
-    }
-
-    @Override
-    public ByteCodeAppender appender(final Target implementationTarget) {
-      return new ByteCodeAppender() {
-        @Override
-        public Size apply(
-            MethodVisitor methodVisitor,
-            Context implementationContext,
-            MethodDescription instrumentedMethod) {
-          StackManipulation.Size size = new StackManipulation.Compound(
-              // Load the this reference
-              MethodVariableAccess.REFERENCE.loadOffset(0),
-              // Invoke the super constructor (default constructor of Object)
-              MethodInvocation
-                  .invoke(new TypeDescription.ForLoadedType(Object.class)
-                    .getDeclaredMethods()
-                    .filter(ElementMatchers.isConstructor()
-                      .and(ElementMatchers.takesArguments(0)))
-                    .getOnly()),
-              // Load the this reference
-              MethodVariableAccess.REFERENCE.loadOffset(0),
-              // Load the delegate argument
-              MethodVariableAccess.REFERENCE.loadOffset(1),
-              // Assign the delegate argument to the delegate field
-              FieldAccess.forField(implementationTarget.getInstrumentedType()
-                  .getDeclaredFields()
-                  .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
-                  .getOnly()).putter(),
-              // Return void.
-              MethodReturn.VOID
-            ).apply(methodVisitor, implementationContext);
-            return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index dd1baab..4cd410a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -92,7 +92,7 @@ public class DoFnTester<InputT, OutputT> {
   @SuppressWarnings("unchecked")
   public static <InputT, OutputT> DoFnTester<InputT, OutputT>
       of(DoFn<InputT, OutputT> fn) {
-    return new DoFnTester<InputT, OutputT>(DoFnReflector.of(fn.getClass()).toDoFn(fn));
+    return new DoFnTester<InputT, OutputT>(DoFnAdapters.toOldDoFn(fn));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index aa57531..af500ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -536,7 +536,7 @@ public class ParDo {
 
   private static <InputT, OutputT> OldDoFn<InputT, OutputT>
       adapt(DoFn<InputT, OutputT> fn) {
-    return DoFnReflector.of(fn.getClass()).toDoFn(fn);
+    return DoFnAdapters.toOldDoFn(fn);
   }
 
   /**
@@ -747,7 +747,7 @@ public class ParDo {
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = DoFnReflector.getDoFnClass(fn);
+      Class<?> clazz = DoFnAdapters.getDoFnClass(fn);
       if (clazz.isAnonymousClass()) {
         return "AnonymousParDo";
       } else {
@@ -968,7 +968,7 @@ public class ParDo {
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = DoFnReflector.getDoFnClass(fn);
+      Class<?> clazz = DoFnAdapters.getDoFnClass(fn);
       if (clazz.isAnonymousClass()) {
         return "AnonymousParMultiDo";
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
new file mode 100644
index 0000000..5818a59
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Interface for invoking the {@code DoFn} processing methods.
+ *
+ * Instantiating a {@link DoFnInvoker} associates it with a specific {@link DoFn} instance,
+ * referred to as the bound {@link DoFn}.
+ */
+public interface DoFnInvoker<InputT, OutputT> {
+  /**
+   * Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}.
+   */
+  void invokeSetup();
+
+  /**
+   * Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}.
+   *
+   * @param c The {@link DoFn.Context} to invoke the fn with.
+   */
+  void invokeStartBundle(DoFn<InputT, OutputT>.Context c);
+
+  /**
+   * Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}.
+   *
+   * @param c The {@link DoFn.Context} to invoke the fn with.
+   */
+  void invokeFinishBundle(DoFn<InputT, OutputT>.Context c);
+
+  /**
+   * Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}.
+   */
+  void invokeTeardown();
+
+  /**
+   * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
+   *
+   * @param c The {@link DoFn.ProcessContext} to invoke the fn with.
+   * @param extra Factory for producing extra parameter objects (such as window), if necessary.
+   */
+  void invokeProcessElement(
+      DoFn<InputT, OutputT>.ProcessContext c, DoFn.ExtraContextFactory<InputT, OutputT> extra);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
new file mode 100644
index 0000000..73874d7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.Setup;
+import org.apache.beam.sdk.transforms.DoFn.StartBundle;
+import org.apache.beam.sdk.transforms.DoFn.Teardown;
+import org.apache.beam.sdk.util.UserCodeException;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.NamingStrategy;
+import net.bytebuddy.description.field.FieldDescription;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.modifier.FieldManifestation;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.MethodCall;
+import net.bytebuddy.implementation.bind.MethodDelegationBinder;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.Throw;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.jar.asm.Label;
+import net.bytebuddy.jar.asm.MethodVisitor;
+import net.bytebuddy.jar.asm.Opcodes;
+import net.bytebuddy.matcher.ElementMatchers;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
+public class DoFnInvokers {
+  public static final DoFnInvokers INSTANCE = new DoFnInvokers();
+
+  private static final String FN_DELEGATE_FIELD_NAME = "delegate";
+
+  /**
+   * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
+   * Needed because generating an invoker class is expensive, and to avoid generating an excessive
+   * number of classes consuming PermGen memory.
+   */
+  private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
+      new LinkedHashMap<>();
+
+  private DoFnInvokers() {}
+
+  /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
+      DoFn<InputT, OutputT> fn) {
+    return newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass()), fn);
+  }
+
+  /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
+      DoFnSignature signature, DoFn<InputT, OutputT> fn) {
+    checkArgument(
+        signature.fnClass().equals(fn.getClass()),
+        "Signature is for class %s, but fn is of class %s",
+        signature.fnClass(),
+        fn.getClass());
+    try {
+      @SuppressWarnings("unchecked")
+      DoFnInvoker<InputT, OutputT> invoker =
+          (DoFnInvoker<InputT, OutputT>)
+              getOrGenerateByteBuddyInvokerConstructor(signature).newInstance(fn);
+      return invoker;
+    } catch (InstantiationException
+        | IllegalAccessException
+        | IllegalArgumentException
+        | InvocationTargetException
+        | SecurityException e) {
+      throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e);
+    }
+  }
+
+  /**
+   * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class and
+   * caches it.
+   */
+  private synchronized Constructor<?> getOrGenerateByteBuddyInvokerConstructor(
+      DoFnSignature signature) {
+    Class<? extends DoFn> fnClass = signature.fnClass();
+    Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
+    if (constructor == null) {
+      Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
+      try {
+        constructor = invokerClass.getConstructor(fnClass);
+      } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+        throw new RuntimeException(e);
+      }
+      byteBuddyInvokerConstructorCache.put(fnClass, constructor);
+    }
+    return constructor;
+  }
+
+  /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
+  private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
+    Class<? extends DoFn> fnClass = signature.fnClass();
+
+    final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);
+
+    DynamicType.Builder<?> builder =
+        new ByteBuddy()
+            // Create subclasses inside the target class, to have access to
+            // private and package-private bits
+            .with(
+                new NamingStrategy.SuffixingRandom("auxiliary") {
+                  @Override
+                  public String subclass(TypeDescription.Generic superClass) {
+                    return super.name(clazzDescription);
+                  }
+                })
+            // Create a subclass of DoFnInvoker
+            .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+            .defineField(
+                FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+            .defineConstructor(Visibility.PUBLIC)
+            .withParameter(fnClass)
+            .intercept(new InvokerConstructor())
+            .method(ElementMatchers.named("invokeProcessElement"))
+            .intercept(new ProcessElementDelegation(signature.processElement()))
+            .method(ElementMatchers.named("invokeStartBundle"))
+            .intercept(
+                signature.startBundle() == null
+                    ? new NoopMethodImplementation()
+                    : new BundleMethodDelegation(signature.startBundle()))
+            .method(ElementMatchers.named("invokeFinishBundle"))
+            .intercept(
+                signature.finishBundle() == null
+                    ? new NoopMethodImplementation()
+                    : new BundleMethodDelegation(signature.finishBundle()))
+            .method(ElementMatchers.named("invokeSetup"))
+            .intercept(
+                signature.setup() == null
+                    ? new NoopMethodImplementation()
+                    : new LifecycleMethodDelegation(signature.setup()))
+            .method(ElementMatchers.named("invokeTeardown"))
+            .intercept(
+                signature.teardown() == null
+                    ? new NoopMethodImplementation()
+                    : new LifecycleMethodDelegation(signature.teardown()));
+
+    DynamicType.Unloaded<?> unloaded = builder.make();
+
+    @SuppressWarnings("unchecked")
+    Class<? extends DoFnInvoker<?, ?>> res =
+        (Class<? extends DoFnInvoker<?, ?>>)
+            unloaded
+                .load(DoFnInvokers.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+                .getLoaded();
+    return res;
+  }
+
+  /** Implements an invoker method by doing nothing and immediately returning void. */
+  private static class NoopMethodImplementation implements Implementation {
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return new ByteCodeAppender() {
+        @Override
+        public Size apply(
+            MethodVisitor methodVisitor,
+            Context implementationContext,
+            MethodDescription instrumentedMethod) {
+          StackManipulation manipulation = MethodReturn.VOID;
+          StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
+          return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+        }
+      };
+    }
+  }
+
+  /**
+   * Base class for implementing an invoker method by delegating to a method of the target {@link
+   * DoFn}.
+   */
+  private abstract static class MethodDelegation implements Implementation {
+    FieldDescription delegateField;
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      // Remember the field description of the instrumented type.
+      delegateField =
+          instrumentedType
+              .getDeclaredFields()
+              .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+              .getOnly();
+
+      // Delegating the method call doesn't require any changes to the instrumented type.
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return new ByteCodeAppender() {
+        @Override
+        public Size apply(
+            MethodVisitor methodVisitor,
+            Context implementationContext,
+            MethodDescription instrumentedMethod) {
+          StackManipulation manipulation =
+              new StackManipulation.Compound(
+                  // Push "this" reference to the stack
+                  MethodVariableAccess.REFERENCE.loadOffset(0),
+                  // Access the delegate field of the the invoker
+                  FieldAccess.forField(delegateField).getter(),
+                  invokeTargetMethod(instrumentedMethod));
+          StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
+          return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+        }
+      };
+    }
+
+    /**
+     * Generates code to invoke the target method. When this is called the delegate field will be on
+     * top of the stack. This should add any necessary arguments to the stack and then perform the
+     * method invocation.
+     */
+    protected abstract StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod);
+  }
+
+  /**
+   * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
+   * {@link DoFn.ProcessElement} method.
+   */
+  private static final class ProcessElementDelegation extends MethodDelegation {
+    private static final Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription>
+        EXTRA_CONTEXT_FACTORY_METHODS;
+
+    static {
+      try {
+        Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription> methods =
+            new EnumMap<>(DoFnSignature.ProcessElementMethod.Parameter.class);
+        methods.put(
+            DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW,
+            new MethodDescription.ForLoadedMethod(
+                DoFn.ExtraContextFactory.class.getMethod("window")));
+        methods.put(
+            DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER,
+            new MethodDescription.ForLoadedMethod(
+                DoFn.ExtraContextFactory.class.getMethod("inputProvider")));
+        methods.put(
+            DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER,
+            new MethodDescription.ForLoadedMethod(
+                DoFn.ExtraContextFactory.class.getMethod("outputReceiver")));
+        EXTRA_CONTEXT_FACTORY_METHODS = Collections.unmodifiableMap(methods);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Failed to locate an ExtraContextFactory method that was expected to exist", e);
+      }
+    }
+
+    private final DoFnSignature.ProcessElementMethod signature;
+
+    /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
+    private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) {
+      this.signature = signature;
+    }
+
+    @Override
+    protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+      MethodDescription targetMethod =
+          new MethodCall.MethodLocator.ForExplicitMethod(
+                  new MethodDescription.ForLoadedMethod(signature.targetMethod()))
+              .resolve(instrumentedMethod);
+
+      // Parameters of the wrapper invoker method:
+      //   DoFn.ProcessContext, ExtraContextFactory.
+      // Parameters of the wrapped DoFn method:
+      //   DoFn.ProcessContext, [BoundedWindow, InputProvider, OutputReceiver] in any order
+      ArrayList<StackManipulation> parameters = new ArrayList<>();
+      // Push the ProcessContext argument.
+      parameters.add(MethodVariableAccess.REFERENCE.loadOffset(1));
+      // Push the extra arguments in their actual order.
+      StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(2);
+      for (DoFnSignature.ProcessElementMethod.Parameter param : signature.extraParameters()) {
+        parameters.add(
+            new StackManipulation.Compound(
+                pushExtraContextFactory,
+                MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param))));
+      }
+
+      return new StackManipulation.Compound(
+          // Push the parameters
+          new StackManipulation.Compound(parameters),
+          // Invoke the target method
+          wrapWithUserCodeException(
+              MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+          // Return from the instrumented method
+          MethodReturn.VOID);
+    }
+  }
+
+  /**
+   * Implements {@link DoFnInvoker#invokeStartBundle} or {@link DoFnInvoker#invokeFinishBundle} by
+   * delegating respectively to the {@link StartBundle} and {@link FinishBundle} methods.
+   */
+  private static final class BundleMethodDelegation extends MethodDelegation {
+    private final DoFnSignature.BundleMethod signature;
+
+    private BundleMethodDelegation(@Nullable DoFnSignature.BundleMethod signature) {
+      this.signature = signature;
+    }
+
+    @Override
+    protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+      MethodDescription targetMethod =
+          new MethodCall.MethodLocator.ForExplicitMethod(
+                  new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
+              .resolve(instrumentedMethod);
+      return new StackManipulation.Compound(
+          // Push the parameters
+          MethodVariableAccess.REFERENCE.loadOffset(1),
+          // Invoke the target method
+          wrapWithUserCodeException(
+              MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+          MethodReturn.VOID);
+    }
+  }
+
+  /**
+   * Implements {@link DoFnInvoker#invokeSetup} or {@link DoFnInvoker#invokeTeardown} by delegating
+   * respectively to the {@link Setup} and {@link Teardown} methods.
+   */
+  private static final class LifecycleMethodDelegation extends MethodDelegation {
+    private final DoFnSignature.LifecycleMethod signature;
+
+    private LifecycleMethodDelegation(@Nullable DoFnSignature.LifecycleMethod signature) {
+      this.signature = signature;
+    }
+
+    @Override
+    protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+      MethodDescription targetMethod =
+          new MethodCall.MethodLocator.ForExplicitMethod(
+                  new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
+              .resolve(instrumentedMethod);
+      return new StackManipulation.Compound(
+          wrapWithUserCodeException(
+              MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+          MethodReturn.VOID);
+    }
+  }
+
+  /**
+   * Wraps a given stack manipulation in a try catch block. Any exceptions thrown within the try are
+   * wrapped with a {@link UserCodeException}.
+   */
+  private static StackManipulation wrapWithUserCodeException(final StackManipulation tryBody) {
+    final MethodDescription createUserCodeException;
+    try {
+      createUserCodeException =
+          new MethodDescription.ForLoadedMethod(
+              UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
+    } catch (NoSuchMethodException | SecurityException e) {
+      throw new RuntimeException("Unable to find UserCodeException.wrap", e);
+    }
+
+    return new StackManipulation() {
+      @Override
+      public boolean isValid() {
+        return tryBody.isValid();
+      }
+
+      @Override
+      public Size apply(MethodVisitor mv, Implementation.Context implementationContext) {
+        Label tryBlockStart = new Label();
+        Label tryBlockEnd = new Label();
+        Label catchBlockStart = new Label();
+        Label catchBlockEnd = new Label();
+
+        String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
+        mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
+
+        // The try block attempts to perform the expected operations, then jumps to success
+        mv.visitLabel(tryBlockStart);
+        Size trySize = tryBody.apply(mv, implementationContext);
+        mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
+        mv.visitLabel(tryBlockEnd);
+
+        // The handler wraps the exception, and then throws.
+        mv.visitLabel(catchBlockStart);
+        // Add the exception to the frame
+        mv.visitFrame(
+            Opcodes.F_SAME1,
+            // No local variables
+            0,
+            new Object[] {},
+            // 1 stack element (the throwable)
+            1,
+            new Object[] {throwableName});
+
+        Size catchSize =
+            new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
+                .apply(mv, implementationContext);
+
+        mv.visitLabel(catchBlockEnd);
+        // The frame contents after the try/catch block is the same
+        // as it was before.
+        mv.visitFrame(
+            Opcodes.F_SAME,
+            // No local variables
+            0,
+            new Object[] {},
+            // No new stack variables
+            0,
+            new Object[] {});
+
+        return new Size(
+            trySize.getSizeImpact(),
+            Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize()));
+      }
+    };
+  }
+
+  /**
+   * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
+   * for a constructor that takes a single argument and assigns it to the delegate field.
+   */
+  private static final class InvokerConstructor implements Implementation {
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return new ByteCodeAppender() {
+        @Override
+        public Size apply(
+            MethodVisitor methodVisitor,
+            Context implementationContext,
+            MethodDescription instrumentedMethod) {
+          StackManipulation.Size size =
+              new StackManipulation.Compound(
+                      // Load the this reference
+                      MethodVariableAccess.REFERENCE.loadOffset(0),
+                      // Invoke the super constructor (default constructor of Object)
+                      MethodInvocation.invoke(
+                          new TypeDescription.ForLoadedType(Object.class)
+                              .getDeclaredMethods()
+                              .filter(
+                                  ElementMatchers.isConstructor()
+                                      .and(ElementMatchers.takesArguments(0)))
+                              .getOnly()),
+                      // Load the this reference
+                      MethodVariableAccess.REFERENCE.loadOffset(0),
+                      // Load the delegate argument
+                      MethodVariableAccess.REFERENCE.loadOffset(1),
+                      // Assign the delegate argument to the delegate field
+                      FieldAccess.forField(
+                              implementationTarget
+                                  .getInstrumentedType()
+                                  .getDeclaredFields()
+                                  .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+                                  .getOnly())
+                          .putter(),
+                      // Return void.
+                      MethodReturn.VOID)
+                  .apply(methodVisitor, implementationContext);
+          return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+        }
+      };
+    }
+  }
+}