You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/17 23:05:26 UTC

[2/4] incubator-beam git commit: Rewrites DoFnReflector to go via DoFnSignature

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
new file mode 100644
index 0000000..6730140
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+import com.google.auto.value.AutoValue;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra
+ * context it requires, types of the input and output elements, etc.
+ *
+ * <p>See <a href="https://s.apache.org/a-new-dofn">A new DoFn</a>.
+ */
+@AutoValue
+public abstract class DoFnSignature {
+  public abstract Class<? extends DoFn> fnClass();
+
+  public abstract ProcessElementMethod processElement();
+
+  @Nullable
+  public abstract BundleMethod startBundle();
+
+  @Nullable
+  public abstract BundleMethod finishBundle();
+
+  @Nullable
+  public abstract LifecycleMethod setup();
+
+  @Nullable
+  public abstract LifecycleMethod teardown();
+
+  static DoFnSignature create(
+      Class<? extends DoFn> fnClass,
+      ProcessElementMethod processElement,
+      @Nullable BundleMethod startBundle,
+      @Nullable BundleMethod finishBundle,
+      @Nullable LifecycleMethod setup,
+      @Nullable LifecycleMethod teardown) {
+    return new AutoValue_DoFnSignature(
+        fnClass,
+        processElement,
+        startBundle,
+        finishBundle,
+        setup,
+        teardown);
+  }
+
+  /** Describes a {@link DoFn.ProcessElement} method. */
+  @AutoValue
+  public abstract static class ProcessElementMethod {
+    enum Parameter {
+      BOUNDED_WINDOW,
+      INPUT_PROVIDER,
+      OUTPUT_RECEIVER
+    }
+
+    public abstract Method targetMethod();
+
+    public abstract List<Parameter> extraParameters();
+
+    static ProcessElementMethod create(Method targetMethod, List<Parameter> extraParameters) {
+      return new AutoValue_DoFnSignature_ProcessElementMethod(
+          targetMethod, Collections.unmodifiableList(extraParameters));
+    }
+
+    /** @return true if the reflected {@link DoFn} uses a Single Window. */
+    public boolean usesSingleWindow() {
+      return extraParameters().contains(Parameter.BOUNDED_WINDOW);
+    }
+  }
+
+  /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
+  @AutoValue
+  public abstract static class BundleMethod {
+    public abstract Method targetMethod();
+
+    static BundleMethod create(Method targetMethod) {
+      return new AutoValue_DoFnSignature_BundleMethod(targetMethod);
+    }
+  }
+
+  /** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */
+  @AutoValue
+  public abstract static class LifecycleMethod {
+    public abstract Method targetMethod();
+
+    static LifecycleMethod create(Method targetMethod) {
+      return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
new file mode 100644
index 0000000..80b3b4f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.reflect.TypeParameter;
+import com.google.common.reflect.TypeToken;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getOrParseSignature}.
+ */
+public class DoFnSignatures {
+  public static final DoFnSignatures INSTANCE = new DoFnSignatures();
+
+  private DoFnSignatures() {}
+
+  private final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
+
+  /** @return the {@link DoFnSignature} for the given {@link DoFn}. */
+  public synchronized DoFnSignature getOrParseSignature(
+      @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
+    DoFnSignature signature = signatureCache.get(fn);
+    if (signature == null) {
+      signatureCache.put(fn, signature = parseSignature(fn));
+    }
+    return signature;
+  }
+
+  /** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */
+  private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) {
+    TypeToken<?> inputT = null;
+    TypeToken<?> outputT = null;
+
+    // Extract the input and output type.
+    checkArgument(
+        DoFn.class.isAssignableFrom(fnClass),
+        "%s must be subtype of DoFn",
+        fnClass.getSimpleName());
+    TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass);
+    for (TypeToken<?> supertype : fnToken.getTypes()) {
+      if (!supertype.getRawType().equals(DoFn.class)) {
+        continue;
+      }
+      Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments();
+      inputT = TypeToken.of(args[0]);
+      outputT = TypeToken.of(args[1]);
+    }
+    checkNotNull(inputT, "Unable to determine input type from %s", fnClass);
+
+    Method processElementMethod = findAnnotatedMethod(DoFn.ProcessElement.class, fnClass, true);
+    Method startBundleMethod = findAnnotatedMethod(DoFn.StartBundle.class, fnClass, false);
+    Method finishBundleMethod = findAnnotatedMethod(DoFn.FinishBundle.class, fnClass, false);
+    Method setupMethod = findAnnotatedMethod(DoFn.Setup.class, fnClass, false);
+    Method teardownMethod = findAnnotatedMethod(DoFn.Teardown.class, fnClass, false);
+
+    return DoFnSignature.create(
+        fnClass,
+        analyzeProcessElementMethod(fnToken, processElementMethod, inputT, outputT),
+        (startBundleMethod == null)
+            ? null
+            : analyzeBundleMethod(fnToken, startBundleMethod, inputT, outputT),
+        (finishBundleMethod == null)
+            ? null
+            : analyzeBundleMethod(fnToken, finishBundleMethod, inputT, outputT),
+        (setupMethod == null) ? null : analyzeLifecycleMethod(setupMethod),
+        (teardownMethod == null) ? null : analyzeLifecycleMethod(teardownMethod));
+  }
+
+  /**
+   * Generates a type token for {@code DoFn<InputT, OutputT>.ProcessContext} given {@code InputT}
+   * and {@code OutputT}.
+   */
+  private static <InputT, OutputT>
+      TypeToken<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf(
+          TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
+    return new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}.where(
+            new TypeParameter<InputT>() {}, inputT)
+        .where(new TypeParameter<OutputT>() {}, outputT);
+  }
+
+  /**
+   * Generates a type token for {@code DoFn<InputT, OutputT>.Context} given {@code InputT} and
+   * {@code OutputT}.
+   */
+  private static <InputT, OutputT> TypeToken<DoFn<InputT, OutputT>.Context> doFnContextTypeOf(
+      TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
+    return new TypeToken<DoFn<InputT, OutputT>.Context>() {}.where(
+            new TypeParameter<InputT>() {}, inputT)
+        .where(new TypeParameter<OutputT>() {}, outputT);
+  }
+
+  /** Generates a type token for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */
+  private static <InputT> TypeToken<DoFn.InputProvider<InputT>> inputProviderTypeOf(
+      TypeToken<InputT> inputT) {
+    return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
+        new TypeParameter<InputT>() {}, inputT);
+  }
+
+  /** Generates a type token for {@code DoFn.OutputReceiver<OutputT>} given {@code OutputT}. */
+  private static <OutputT> TypeToken<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf(
+      TypeToken<OutputT> inputT) {
+    return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
+        new TypeParameter<OutputT>() {}, inputT);
+  }
+
+  @VisibleForTesting
+  static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
+      TypeToken<? extends DoFn> fnClass, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
+    checkArgument(
+        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+    checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+
+    TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
+
+    Type[] params = m.getGenericParameterTypes();
+    TypeToken<?> contextToken = null;
+    if (params.length > 0) {
+      contextToken = fnClass.resolveType(params[0]);
+    }
+    checkArgument(
+        contextToken != null && contextToken.equals(processContextToken),
+        "%s must take a %s as its first argument",
+        format(m),
+        formatType(processContextToken));
+
+    List<DoFnSignature.ProcessElementMethod.Parameter> extraParameters = new ArrayList<>();
+    TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
+    TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
+    for (int i = 1; i < params.length; ++i) {
+      TypeToken<?> param = fnClass.resolveType(params[i]);
+      Class<?> rawType = param.getRawType();
+      if (rawType.equals(BoundedWindow.class)) {
+        checkArgument(
+            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW),
+            "Multiple BoundedWindow parameters in %s",
+            format(m));
+        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW);
+      } else if (rawType.equals(DoFn.InputProvider.class)) {
+        checkArgument(
+            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER),
+            "Multiple InputProvider parameters in %s",
+            format(m));
+        checkArgument(
+            param.equals(expectedInputProviderT),
+            "Wrong type of InputProvider parameter for method %s: %s, should be %s",
+            format(m),
+            formatType(param),
+            formatType(expectedInputProviderT));
+        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER);
+      } else if (rawType.equals(DoFn.OutputReceiver.class)) {
+        checkArgument(
+            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER),
+            "Multiple OutputReceiver parameters in %s",
+            format(m));
+        checkArgument(
+            param.equals(expectedOutputReceiverT),
+            "Wrong type of OutputReceiver parameter for method %s: %s, should be %s",
+            format(m),
+            formatType(param),
+            formatType(expectedOutputReceiverT));
+        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER);
+      } else {
+        List<String> allowedParamTypes =
+            Arrays.asList(formatType(new TypeToken<BoundedWindow>() {}));
+        checkArgument(
+            false,
+            "%s is not a valid context parameter for method %s. Should be one of %s",
+            formatType(param),
+            format(m),
+            allowedParamTypes);
+      }
+    }
+
+    return DoFnSignature.ProcessElementMethod.create(m, extraParameters);
+  }
+
+  @VisibleForTesting
+  static DoFnSignature.BundleMethod analyzeBundleMethod(
+      TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
+    checkArgument(
+        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+    checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+
+    TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT);
+
+    Type[] params = m.getGenericParameterTypes();
+    checkArgument(
+        params.length == 1,
+        "%s must have a single argument of type %s",
+        format(m),
+        formatType(expectedContextToken));
+    TypeToken<?> contextToken = fnToken.resolveType(params[0]);
+    checkArgument(
+        contextToken.equals(expectedContextToken),
+        "Wrong type of context argument to %s: %s, must be %s",
+        format(m),
+        formatType(contextToken),
+        formatType(expectedContextToken));
+
+    return DoFnSignature.BundleMethod.create(m);
+  }
+
+  private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(Method m) {
+    checkArgument(
+        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+    checkArgument(
+        m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
+    return DoFnSignature.LifecycleMethod.create(m);
+  }
+
+  private static Collection<Method> declaredMethodsWithAnnotation(
+      Class<? extends Annotation> anno, Class<?> startClass, Class<?> stopClass) {
+    Collection<Method> matches = new ArrayList<>();
+
+    Class<?> clazz = startClass;
+    LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
+
+    // First, find all declared methods on the startClass and parents (up to stopClass)
+    while (clazz != null && !clazz.equals(stopClass)) {
+      for (Method method : clazz.getDeclaredMethods()) {
+        if (method.isAnnotationPresent(anno)) {
+          matches.add(method);
+        }
+      }
+
+      Collections.addAll(interfaces, clazz.getInterfaces());
+
+      clazz = clazz.getSuperclass();
+    }
+
+    // Now, iterate over all the discovered interfaces
+    for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
+      if (method.isAnnotationPresent(anno)) {
+        matches.add(method);
+      }
+    }
+    return matches;
+  }
+
+  private static Method findAnnotatedMethod(
+      Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
+    Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class);
+
+    if (matches.size() == 0) {
+      checkArgument(
+          !required,
+          "No method annotated with @%s found in %s",
+          anno.getSimpleName(),
+          fnClazz.getName());
+      return null;
+    }
+
+    // If we have at least one match, then either it should be the only match
+    // or it should be an extension of the other matches (which came from parent
+    // classes).
+    Method first = matches.iterator().next();
+    for (Method other : matches) {
+      checkArgument(
+          first.getName().equals(other.getName())
+              && Arrays.equals(first.getParameterTypes(), other.getParameterTypes()),
+          "Found multiple methods annotated with @%s. [%s] and [%s]",
+          anno.getSimpleName(),
+          format(first),
+          format(other));
+    }
+
+    // We need to be able to call it. We require it is public.
+    checkArgument(
+        (first.getModifiers() & Modifier.PUBLIC) != 0, "%s must be public", format(first));
+
+    // And make sure its not static.
+    checkArgument(
+        (first.getModifiers() & Modifier.STATIC) == 0, "%s must not be static", format(first));
+
+    return first;
+  }
+
+  private static String format(Method m) {
+    return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
+  }
+
+  private static String formatType(TypeToken<?> t) {
+    return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
new file mode 100644
index 0000000..4df5209
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines reflection-based utilities for analyzing {@link org.apache.beam.sdk.transforms.DoFn}'s
+ * and creating {@link org.apache.beam.sdk.transforms.reflect.DoFnSignature}'s and
+ * {@link org.apache.beam.sdk.transforms.reflect.DoFnInvoker}'s from them.
+ */
+package org.apache.beam.sdk.transforms.reflect;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
deleted file mode 100644
index e05e5e2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ /dev/null
@@ -1,822 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
-import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.UserCodeException;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.lang.reflect.Method;
-
-/**
- * Tests for {@link DoFnReflector}.
- */
-@RunWith(JUnit4.class)
-public class DoFnReflectorTest {
-
-  /**
-   * A convenience struct holding flags that indicate whether a particular method was invoked.
-   */
-  public static class Invocations {
-    public boolean wasProcessElementInvoked = false;
-    public boolean wasStartBundleInvoked = false;
-    public boolean wasFinishBundleInvoked = false;
-    public boolean wasSetupInvoked = false;
-    public boolean wasTeardownInvoked = false;
-    private final String name;
-
-    public Invocations(String name) {
-      this.name = name;
-    }
-  }
-
-  private DoFn<String, String> fn;
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Mock
-  private DoFn<String, String>.ProcessContext mockContext;
-  @Mock
-  private BoundedWindow mockWindow;
-  @Mock
-  private DoFn.InputProvider<String> mockInputProvider;
-  @Mock
-  private DoFn.OutputReceiver<String> mockOutputReceiver;
-
-  private ExtraContextFactory<String, String> extraContextFactory;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-    this.extraContextFactory = new ExtraContextFactory<String, String>() {
-      @Override
-      public BoundedWindow window() {
-        return mockWindow;
-      }
-
-      @Override
-      public DoFn.InputProvider<String> inputProvider() {
-        return mockInputProvider;
-      }
-
-      @Override
-      public DoFn.OutputReceiver<String> outputReceiver() {
-        return mockOutputReceiver;
-      }
-    };
-  }
-
-  private DoFnReflector underTest(DoFn<String, String> fn) {
-    this.fn = fn;
-    return DoFnReflector.of(fn.getClass());
-  }
-
-  private void checkInvokeProcessElementWorks(
-      DoFnReflector r, Invocations... invocations) throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called processElement on " + invocation.name,
-          invocation.wasProcessElementInvoked);
-    }
-    r.bindInvoker(fn).invokeProcessElement(mockContext, extraContextFactory);
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called processElement on " + invocation.name,
-          invocation.wasProcessElementInvoked);
-    }
-  }
-
-  private void checkInvokeStartBundleWorks(
-      DoFnReflector r, Invocations... invocations) throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called startBundle on " + invocation.name,
-          invocation.wasStartBundleInvoked);
-    }
-    r.bindInvoker(fn).invokeStartBundle(mockContext, extraContextFactory);
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called startBundle on " + invocation.name,
-          invocation.wasStartBundleInvoked);
-    }
-  }
-
-  private void checkInvokeFinishBundleWorks(
-      DoFnReflector r, Invocations... invocations) throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called finishBundle on " + invocation.name,
-          invocation.wasFinishBundleInvoked);
-    }
-    r.bindInvoker(fn).invokeFinishBundle(mockContext, extraContextFactory);
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called finishBundle on " + invocation.name,
-          invocation.wasFinishBundleInvoked);
-    }
-  }
-
-  private void checkInvokeSetupWorks(DoFnReflector r, Invocations... invocations) throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called setup on " + invocation.name,
-          invocation.wasSetupInvoked);
-    }
-    r.bindInvoker(fn).invokeSetup();
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called setup on " + invocation.name,
-          invocation.wasSetupInvoked);
-    }
-  }
-
-  private void checkInvokeTeardownWorks(DoFnReflector r, Invocations... invocations)
-      throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse("Should not yet have called teardown on " + invocation.name,
-          invocation.wasTeardownInvoked);
-    }
-    r.bindInvoker(fn).invokeTeardown();
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called teardown on " + invocation.name,
-          invocation.wasTeardownInvoked);
-    }
-  }
-
-  @Test
-  public void testDoFnWithNoExtraContext() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
-      @ProcessElement
-      public void processElement(ProcessContext c)
-          throws Exception {
-        invocations.wasProcessElementInvoked = true;
-        assertSame(c, mockContext);
-      }
-    });
-
-    assertFalse(reflector.usesSingleWindow());
-
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnInvokersReused() throws Exception {
-    // Ensures that we don't create a new Invoker class for every instance of the OldDoFn.
-    IdentityParent fn1 = new IdentityParent();
-    IdentityParent fn2 = new IdentityParent();
-    DoFnReflector reflector1 = underTest(fn1);
-    DoFnReflector reflector2 = underTest(fn2);
-    assertSame("DoFnReflector instances should be cached and reused for identical types",
-        reflector1, reflector2);
-    assertSame("Invoker classes should only be generated once for each type",
-        reflector1.bindInvoker(fn1).getClass(),
-        reflector2.bindInvoker(fn2).getClass());
-  }
-
-  interface InterfaceWithProcessElement {
-    @ProcessElement
-    void processElement(DoFn<String, String>.ProcessContext c);
-  }
-
-  interface LayersOfInterfaces extends InterfaceWithProcessElement {}
-
-  private class IdentityUsingInterfaceWithProcessElement
-      extends DoFn<String, String>
-      implements LayersOfInterfaces {
-
-    private Invocations invocations = new Invocations("Named Class");
-
-    @Override
-    public void processElement(DoFn<String, String>.ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-      assertSame(c, mockContext);
-    }
-  }
-
-  @Test
-  public void testDoFnWithProcessElementInterface() throws Exception {
-    IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
-    DoFnReflector reflector = underTest(fn);
-    assertFalse(reflector.usesSingleWindow());
-    checkInvokeProcessElementWorks(reflector, fn.invocations);
-  }
-
-  private class IdentityParent extends DoFn<String, String> {
-    protected Invocations parentInvocations = new Invocations("IdentityParent");
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      parentInvocations.wasProcessElementInvoked = true;
-      assertSame(c, mockContext);
-    }
-  }
-
-  private class IdentityChildWithoutOverride extends IdentityParent {
-  }
-
-  private class IdentityChildWithOverride extends IdentityParent {
-    protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
-
-    @Override
-    public void process(DoFn<String, String>.ProcessContext c) {
-      super.process(c);
-      childInvocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  @Test
-  public void testDoFnWithMethodInSuperclass() throws Exception {
-    IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
-    DoFnReflector reflector = underTest(fn);
-    assertFalse(reflector.usesSingleWindow());
-    checkInvokeProcessElementWorks(reflector, fn.parentInvocations);
-  }
-
-  @Test
-  public void testDoFnWithMethodInSubclass() throws Exception {
-    IdentityChildWithOverride fn = new IdentityChildWithOverride();
-    DoFnReflector reflector = underTest(fn);
-    assertFalse(reflector.usesSingleWindow());
-    checkInvokeProcessElementWorks(reflector, fn.parentInvocations, fn.childInvocations);
-  }
-
-  @Test
-  public void testDoFnWithWindow() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
-      @ProcessElement
-      public void processElement(ProcessContext c, BoundedWindow w)
-          throws Exception {
-        invocations.wasProcessElementInvoked = true;
-        assertSame(c, mockContext);
-        assertSame(w, mockWindow);
-      }
-    });
-
-    assertTrue(reflector.usesSingleWindow());
-
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnWithOutputReceiver() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
-      @ProcessElement
-      public void processElement(ProcessContext c, DoFn.OutputReceiver<String> o)
-          throws Exception {
-        invocations.wasProcessElementInvoked = true;
-        assertSame(c, mockContext);
-        assertSame(o, mockOutputReceiver);
-      }
-    });
-
-    assertFalse(reflector.usesSingleWindow());
-
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnWithInputProvider() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
-      @ProcessElement
-      public void processElement(ProcessContext c, DoFn.InputProvider<String> i)
-          throws Exception {
-        invocations.wasProcessElementInvoked = true;
-        assertSame(c, mockContext);
-        assertSame(i, mockInputProvider);
-      }
-    });
-
-    assertFalse(reflector.usesSingleWindow());
-
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnWithStartBundle() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
-      @StartBundle
-      public void startBundle(Context c) {
-        invocations.wasStartBundleInvoked = true;
-        assertSame(c, mockContext);
-      }
-
-      @FinishBundle
-      public void finishBundle(Context c) {
-        invocations.wasFinishBundleInvoked = true;
-        assertSame(c, mockContext);
-      }
-    });
-
-    checkInvokeStartBundleWorks(reflector, invocations);
-    checkInvokeFinishBundleWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testDoFnWithSetupTeardown() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
-      @StartBundle
-      public void startBundle(Context c) {
-        invocations.wasStartBundleInvoked = true;
-        assertSame(c, mockContext);
-      }
-
-      @FinishBundle
-      public void finishBundle(Context c) {
-        invocations.wasFinishBundleInvoked = true;
-        assertSame(c, mockContext);
-      }
-
-      @Setup
-      public void before() {
-        invocations.wasSetupInvoked = true;
-      }
-
-      @Teardown
-      public void after() {
-        invocations.wasTeardownInvoked = true;
-      }
-    });
-
-    checkInvokeSetupWorks(reflector, invocations);
-    checkInvokeTeardownWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testNoProcessElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("No method annotated with @ProcessElement found");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {});
-  }
-
-  @Test
-  public void testMultipleProcessElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
-    thrown.expectMessage("foo()");
-    thrown.expectMessage("bar()");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void foo() {}
-
-      @ProcessElement
-      public void bar() {}
-    });
-  }
-
-  @Test
-  public void testMultipleStartBundleElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Found multiple methods annotated with @StartBundle");
-    thrown.expectMessage("bar()");
-    thrown.expectMessage("baz()");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void foo() {}
-
-      @StartBundle
-      public void bar() {}
-
-      @StartBundle
-      public void baz() {}
-    });
-  }
-
-  @Test
-  public void testMultipleFinishBundleElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Found multiple methods annotated with @FinishBundle");
-    thrown.expectMessage("bar()");
-    thrown.expectMessage("baz()");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void foo() {}
-
-      @FinishBundle
-      public void bar() {}
-
-      @FinishBundle
-      public void baz() {}
-    });
-  }
-
-  private static class PrivateDoFnClass extends DoFn<String, String> {
-    final Invocations invocations = new Invocations(getClass().getName());
-
-    @ProcessElement
-    public void processThis(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  @Test
-  public void testLocalPrivateDoFnClass() throws Exception {
-    PrivateDoFnClass fn = new PrivateDoFnClass();
-    DoFnReflector reflector = underTest(fn);
-    checkInvokeProcessElementWorks(reflector, fn.invocations);
-  }
-
-  @Test
-  public void testStaticPackagePrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
-    DoFnReflector reflector =
-        underTest(DoFnReflectorTestHelper.newStaticPackagePrivateDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testInnerPackagePrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
-    DoFnReflector reflector =
-        underTest(new DoFnReflectorTestHelper().newInnerPackagePrivateDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testStaticPrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticPrivateDoFn");
-    DoFnReflector reflector = underTest(DoFnReflectorTestHelper.newStaticPrivateDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testInnerPrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticInnerDoFn");
-    DoFnReflector reflector =
-        underTest(new DoFnReflectorTestHelper().newInnerPrivateDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
-    Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
-    DoFnReflector reflector =
-        underTest(new DoFnReflectorTestHelper().newInnerAnonymousDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
-    Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
-    DoFnReflector reflector =
-        underTest(DoFnReflectorTestHelper.newStaticAnonymousDoFn(invocations));
-    checkInvokeProcessElementWorks(reflector, invocations);
-  }
-
-  @Test
-  public void testPrivateProcessElement() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("process() must be public");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      private void process() {}
-    });
-  }
-
-  @Test
-  public void testPrivateStartBundle() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("startBundle() must be public");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement() {}
-
-      @StartBundle
-      void startBundle() {}
-    });
-  }
-
-  @Test
-  public void testPrivateFinishBundle() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("finishBundle() must be public");
-    thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement() {}
-
-      @FinishBundle
-      void finishBundle() {}
-    });
-  }
-
-  @SuppressWarnings({"unused"})
-  private void missingProcessContext() {}
-
-  @Test
-  public void testMissingProcessContext() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(getClass().getName()
-        + "#missingProcessContext() must take a ProcessContext as its first argument");
-
-    DoFnReflector.verifyProcessMethodArguments(
-        getClass().getDeclaredMethod("missingProcessContext"));
-  }
-
-  @SuppressWarnings({"unused"})
-  private void badProcessContext(String s) {}
-
-  @Test
-  public void testBadProcessContextType() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(getClass().getName()
-        + "#badProcessContext(String) must take a ProcessContext as its first argument");
-
-    DoFnReflector.verifyProcessMethodArguments(
-        getClass().getDeclaredMethod("badProcessContext", String.class));
-  }
-
-  @SuppressWarnings({"unused"})
-  private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
-
-  @Test
-  public void testBadExtraContext() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "int is not a valid context parameter for method "
-        + getClass().getName() + "#badExtraContext(Context, int). Should be one of [");
-
-    DoFnReflector.verifyBundleMethodArguments(
-        getClass().getDeclaredMethod("badExtraContext", Context.class, int.class));
-  }
-
-  @SuppressWarnings({"unused"})
-  private void badExtraProcessContext(
-      DoFn<Integer, String>.ProcessContext c, Integer n) {}
-
-  @Test
-  public void testBadExtraProcessContextType() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Integer is not a valid context parameter for method "
-        + getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)"
-        + ". Should be one of [BoundedWindow]");
-
-    DoFnReflector.verifyProcessMethodArguments(
-        getClass().getDeclaredMethod("badExtraProcessContext",
-            ProcessContext.class, Integer.class));
-  }
-
-  @SuppressWarnings("unused")
-  private int badReturnType() {
-    return 0;
-  }
-
-  @Test
-  public void testBadReturnType() throws Exception {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void return type");
-
-    DoFnReflector.verifyProcessMethodArguments(getClass().getDeclaredMethod("badReturnType"));
-  }
-
-  @SuppressWarnings("unused")
-  private void goodGenerics(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<Integer> input,
-      DoFn.OutputReceiver<String> output) {}
-
-  @Test
-  public void testValidGenerics() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "goodGenerics",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private void goodWildcards(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<?> input,
-      DoFn.OutputReceiver<?> output) {}
-
-  @Test
-  public void testGoodWildcards() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "goodWildcards",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private void goodBoundedWildcards(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<? super Integer> input,
-      DoFn.OutputReceiver<? super String> output) {}
-
-  @Test
-  public void testGoodBoundedWildcards() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "goodBoundedWildcards",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private <InputT, OutputT> void goodTypeVariables(
-      DoFn<InputT, OutputT>.ProcessContext c,
-      DoFn.InputProvider<InputT> input,
-      DoFn.OutputReceiver<OutputT> output) {}
-
-  @Test
-  public void testGoodTypeVariables() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "goodTypeVariables",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private void badGenericTwoArgs(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<Integer> input,
-      DoFn.OutputReceiver<Integer> output) {}
-
-  @Test
-  public void testBadGenericsTwoArgs() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "badGenericTwoArgs",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Incompatible generics in context parameter "
-        + "OutputReceiver<Integer> "
-        + "for method " + getClass().getName()
-        + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be "
-        + "OutputReceiver<String>");
-
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private void badGenericWildCards(
-      DoFn<Integer, String>.ProcessContext c,
-      DoFn.InputProvider<Integer> input,
-      DoFn.OutputReceiver<? super Integer> output) {}
-
-  @Test
-  public void testBadGenericWildCards() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "badGenericWildCards",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Incompatible generics in context parameter "
-        + "OutputReceiver<? super Integer> for method "
-        + getClass().getName()
-        + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be "
-        + "OutputReceiver<String>");
-
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @SuppressWarnings("unused")
-  private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c,
-      DoFn.InputProvider<InputT> input, DoFn.OutputReceiver<InputT> output) {}
-
-  @Test
-  public void testBadTypeVariables() throws Exception {
-    Method method =
-        getClass()
-            .getDeclaredMethod(
-                "badTypeVariables",
-                DoFn.ProcessContext.class,
-                DoFn.InputProvider.class,
-                DoFn.OutputReceiver.class);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Incompatible generics in context parameter "
-        + "OutputReceiver<InputT> for method " + getClass().getName()
-        + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be "
-        + "OutputReceiver<OutputT>");
-
-    DoFnReflector.verifyProcessMethodArguments(method);
-  }
-
-  @Test
-  public void testProcessElementException() throws Exception {
-    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {
-        throw new IllegalArgumentException("bogus");
-      }
-    };
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectMessage("bogus");
-    DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeProcessElement(null, null);
-  }
-
-  @Test
-  public void testStartBundleException() throws Exception {
-    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
-      @StartBundle
-      public void startBundle(@SuppressWarnings("unused") Context c) {
-        throw new IllegalArgumentException("bogus");
-      }
-
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {
-      }
-    };
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectMessage("bogus");
-    DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeStartBundle(null, null);
-  }
-
-  @Test
-  public void testFinishBundleException() throws Exception {
-    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
-      @FinishBundle
-      public void finishBundle(@SuppressWarnings("unused") Context c) {
-        throw new IllegalArgumentException("bogus");
-      }
-
-      @ProcessElement
-      public void processElement(@SuppressWarnings("unused") ProcessContext c) {
-      }
-    };
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectMessage("bogus");
-    DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeFinishBundle(null, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 604536b..3469223 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -99,7 +99,7 @@ public class FlattenTest implements Serializable {
     PCollection<String> output =
         makePCollectionListOfStrings(p, inputs)
         .apply(Flatten.<String>pCollections())
-        .apply(ParDo.of(new IdentityFn<String>(){}));
+        .apply(ParDo.of(new IdentityFn<String>()));
 
     PAssert.that(output).containsInAnyOrder(flattenLists(inputs));
     p.run();
@@ -152,7 +152,7 @@ public class FlattenTest implements Serializable {
     PCollection<String> output =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
-        .apply(ParDo.of(new IdentityFn<String>(){}));
+        .apply(ParDo.of(new IdentityFn<String>()));
 
     PAssert.that(output).empty();
     p.run();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
deleted file mode 100644
index 90fba12..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.dofnreflector;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnReflectorTest.Invocations;
-
-/**
- * Test helper for DoFnReflectorTest, which needs to test package-private access
- * to DoFns in other packages.
- */
-public class DoFnReflectorTestHelper {
-
-  private static class StaticPrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public StaticPrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  private class InnerPrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public InnerPrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  static class StaticPackagePrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public StaticPackagePrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  class InnerPackagePrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public InnerPackagePrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  public static DoFn<String, String> newStaticPackagePrivateDoFn(
-      Invocations invocations) {
-    return new StaticPackagePrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
-    return new InnerPackagePrivateDoFn(invocations);
-  }
-
-  public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
-    return new StaticPrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
-    return new InnerPrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
-    return new DoFn<String, String>() {
-      @ProcessElement
-      public void process(ProcessContext c) {
-        invocations.wasProcessElementInvoked = true;
-      }
-    };
-  }
-
-  public static DoFn<String, String> newStaticAnonymousDoFn(
-      final Invocations invocations) {
-    return new DoFn<String, String>() {
-      @ProcessElement
-      public void process(ProcessContext c) {
-        invocations.wasProcessElementInvoked = true;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
new file mode 100644
index 0000000..7e756e2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.UserCodeException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DoFnInvokers}. */
+public class DoFnInvokersTest {
+  /** A convenience struct holding flags that indicate whether a particular method was invoked. */
+  public static class Invocations {
+    public boolean wasProcessElementInvoked = false;
+    public boolean wasStartBundleInvoked = false;
+    public boolean wasFinishBundleInvoked = false;
+    public boolean wasSetupInvoked = false;
+    public boolean wasTeardownInvoked = false;
+    private final String name;
+
+    public Invocations(String name) {
+      this.name = name;
+    }
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private DoFn.ProcessContext mockContext;
+  @Mock private BoundedWindow mockWindow;
+  @Mock private DoFn.InputProvider<String> mockInputProvider;
+  @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
+
+  private DoFn.ExtraContextFactory<String, String> extraContextFactory;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    this.extraContextFactory =
+        new DoFn.ExtraContextFactory<String, String>() {
+          @Override
+          public BoundedWindow window() {
+            return mockWindow;
+          }
+
+          @Override
+          public DoFn.InputProvider<String> inputProvider() {
+            return mockInputProvider;
+          }
+
+          @Override
+          public DoFn.OutputReceiver<String> outputReceiver() {
+            return mockOutputReceiver;
+          }
+        };
+  }
+
+  private void checkInvokeProcessElementWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called processElement on " + invocation.name,
+          invocation.wasProcessElementInvoked);
+    }
+    DoFnInvokers.INSTANCE
+        .newByteBuddyInvoker(fn)
+        .invokeProcessElement(mockContext, extraContextFactory);
+    for (Invocations invocation : invocations) {
+      assertTrue(
+          "Should have called processElement on " + invocation.name,
+          invocation.wasProcessElementInvoked);
+    }
+  }
+
+  private void checkInvokeStartBundleWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called startBundle on " + invocation.name,
+          invocation.wasStartBundleInvoked);
+    }
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(mockContext);
+    for (Invocations invocation : invocations) {
+      assertTrue(
+          "Should have called startBundle on " + invocation.name, invocation.wasStartBundleInvoked);
+    }
+  }
+
+  private void checkInvokeFinishBundleWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called finishBundle on " + invocation.name,
+          invocation.wasFinishBundleInvoked);
+    }
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(mockContext);
+    for (Invocations invocation : invocations) {
+      assertTrue(
+          "Should have called finishBundle on " + invocation.name,
+          invocation.wasFinishBundleInvoked);
+    }
+  }
+
+  private void checkInvokeSetupWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called setup on " + invocation.name, invocation.wasSetupInvoked);
+    }
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeSetup();
+    for (Invocations invocation : invocations) {
+      assertTrue("Should have called setup on " + invocation.name, invocation.wasSetupInvoked);
+    }
+  }
+
+  private void checkInvokeTeardownWorks(DoFn<String, String> fn, Invocations... invocations)
+      throws Exception {
+    assertTrue("Need at least one invocation to check", invocations.length >= 1);
+    for (Invocations invocation : invocations) {
+      assertFalse(
+          "Should not yet have called teardown on " + invocation.name,
+          invocation.wasTeardownInvoked);
+    }
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeTeardown();
+    for (Invocations invocation : invocations) {
+      assertTrue(
+          "Should have called teardown on " + invocation.name, invocation.wasTeardownInvoked);
+    }
+  }
+
+  @Test
+  public void testDoFnWithNoExtraContext() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            invocations.wasProcessElementInvoked = true;
+            assertSame(c, mockContext);
+          }
+        };
+
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+
+    checkInvokeProcessElementWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnInvokersReused() throws Exception {
+    // Ensures that we don't create a new Invoker class for every instance of the DoFn.
+    IdentityParent fn1 = new IdentityParent();
+    IdentityParent fn2 = new IdentityParent();
+    assertSame(
+        "Invoker classes should only be generated once for each type",
+        DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn1).getClass(),
+        DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass());
+  }
+
+  interface InterfaceWithProcessElement {
+    @DoFn.ProcessElement
+    void processElement(DoFn<String, String>.ProcessContext c);
+  }
+
+  interface LayersOfInterfaces extends InterfaceWithProcessElement {}
+
+  private class IdentityUsingInterfaceWithProcessElement extends DoFn<String, String>
+      implements LayersOfInterfaces {
+
+    private Invocations invocations = new Invocations("Named Class");
+
+    @Override
+    public void processElement(DoFn<String, String>.ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+      assertSame(c, mockContext);
+    }
+  }
+
+  @Test
+  public void testDoFnWithProcessElementInterface() throws Exception {
+    IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+    checkInvokeProcessElementWorks(fn, fn.invocations);
+  }
+
+  private class IdentityParent extends DoFn<String, String> {
+    protected Invocations parentInvocations = new Invocations("IdentityParent");
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      parentInvocations.wasProcessElementInvoked = true;
+      assertSame(c, mockContext);
+    }
+  }
+
+  private class IdentityChildWithoutOverride extends IdentityParent {}
+
+  private class IdentityChildWithOverride extends IdentityParent {
+    protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
+
+    @Override
+    public void process(DoFn<String, String>.ProcessContext c) {
+      super.process(c);
+      childInvocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  @Test
+  public void testDoFnWithMethodInSuperclass() throws Exception {
+    IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+    checkInvokeProcessElementWorks(fn, fn.parentInvocations);
+  }
+
+  @Test
+  public void testDoFnWithMethodInSubclass() throws Exception {
+    IdentityChildWithOverride fn = new IdentityChildWithOverride();
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+    checkInvokeProcessElementWorks(fn, fn.parentInvocations, fn.childInvocations);
+  }
+
+  @Test
+  public void testDoFnWithWindow() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c, BoundedWindow w) throws Exception {
+            invocations.wasProcessElementInvoked = true;
+            assertSame(c, mockContext);
+            assertSame(w, mockWindow);
+          }
+        };
+
+    assertTrue(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+
+    checkInvokeProcessElementWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnWithOutputReceiver() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {
+            invocations.wasProcessElementInvoked = true;
+            assertSame(c, mockContext);
+            assertSame(o, mockOutputReceiver);
+          }
+        };
+
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+
+    checkInvokeProcessElementWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnWithInputProvider() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c, InputProvider<String> i) throws Exception {
+            invocations.wasProcessElementInvoked = true;
+            assertSame(c, mockContext);
+            assertSame(i, mockInputProvider);
+          }
+        };
+
+    assertFalse(
+        DoFnSignatures.INSTANCE
+            .getOrParseSignature(fn.getClass())
+            .processElement()
+            .usesSingleWindow());
+
+    checkInvokeProcessElementWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnWithStartBundle() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+
+          @StartBundle
+          public void startBundle(Context c) {
+            invocations.wasStartBundleInvoked = true;
+            assertSame(c, mockContext);
+          }
+
+          @FinishBundle
+          public void finishBundle(Context c) {
+            invocations.wasFinishBundleInvoked = true;
+            assertSame(c, mockContext);
+          }
+        };
+
+    checkInvokeStartBundleWorks(fn, invocations);
+    checkInvokeFinishBundleWorks(fn, invocations);
+  }
+
+  @Test
+  public void testDoFnWithSetupTeardown() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFn<String, String> fn =
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+
+          @StartBundle
+          public void startBundle(Context c) {
+            invocations.wasStartBundleInvoked = true;
+            assertSame(c, mockContext);
+          }
+
+          @FinishBundle
+          public void finishBundle(Context c) {
+            invocations.wasFinishBundleInvoked = true;
+            assertSame(c, mockContext);
+          }
+
+          @Setup
+          public void before() {
+            invocations.wasSetupInvoked = true;
+          }
+
+          @Teardown
+          public void after() {
+            invocations.wasTeardownInvoked = true;
+          }
+        };
+
+    checkInvokeSetupWorks(fn, invocations);
+    checkInvokeTeardownWorks(fn, invocations);
+  }
+
+  private static class PrivateDoFnClass extends DoFn<String, String> {
+    final Invocations invocations = new Invocations(getClass().getName());
+
+    @ProcessElement
+    public void processThis(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  @Test
+  public void testLocalPrivateDoFnClass() throws Exception {
+    PrivateDoFnClass fn = new PrivateDoFnClass();
+    checkInvokeProcessElementWorks(fn, fn.invocations);
+  }
+
+  @Test
+  public void testStaticPackagePrivateDoFnClass() throws Exception {
+    Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
+    checkInvokeProcessElementWorks(
+        DoFnInvokersTestHelper.newStaticPackagePrivateDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testInnerPackagePrivateDoFnClass() throws Exception {
+    Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
+    checkInvokeProcessElementWorks(
+        new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testStaticPrivateDoFnClass() throws Exception {
+    Invocations invocations = new Invocations("StaticPrivateDoFn");
+    checkInvokeProcessElementWorks(
+        DoFnInvokersTestHelper.newStaticPrivateDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testInnerPrivateDoFnClass() throws Exception {
+    Invocations invocations = new Invocations("StaticInnerDoFn");
+    checkInvokeProcessElementWorks(
+        new DoFnInvokersTestHelper().newInnerPrivateDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
+    Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
+    checkInvokeProcessElementWorks(
+        new DoFnInvokersTestHelper().newInnerAnonymousDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
+    Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
+    checkInvokeProcessElementWorks(
+        DoFnInvokersTestHelper.newStaticAnonymousDoFn(invocations), invocations);
+  }
+
+  @Test
+  public void testProcessElementException() throws Exception {
+    DoFn<Integer, Integer> fn =
+        new DoFn<Integer, Integer>() {
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {
+            throw new IllegalArgumentException("bogus");
+          }
+        };
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeProcessElement(null, null);
+  }
+
+  @Test
+  public void testStartBundleException() throws Exception {
+    DoFn<Integer, Integer> fn =
+        new DoFn<Integer, Integer>() {
+          @StartBundle
+          public void startBundle(@SuppressWarnings("unused") Context c) {
+            throw new IllegalArgumentException("bogus");
+          }
+
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+        };
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(null);
+  }
+
+  @Test
+  public void testFinishBundleException() throws Exception {
+    DoFn<Integer, Integer> fn =
+        new DoFn<Integer, Integer>() {
+          @FinishBundle
+          public void finishBundle(@SuppressWarnings("unused") Context c) {
+            throw new IllegalArgumentException("bogus");
+          }
+
+          @ProcessElement
+          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+        };
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
new file mode 100644
index 0000000..7bfdddc
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest.Invocations;
+
+/**
+ * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access
+ * to DoFns in other packages.
+ */
+public class DoFnInvokersTestHelper {
+
+  private static class StaticPrivateDoFn extends DoFn<String, String> {
+    final Invocations invocations;
+
+    public StaticPrivateDoFn(Invocations invocations) {
+      this.invocations = invocations;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  private class InnerPrivateDoFn extends DoFn<String, String> {
+    final Invocations invocations;
+
+    public InnerPrivateDoFn(Invocations invocations) {
+      this.invocations = invocations;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  static class StaticPackagePrivateDoFn extends DoFn<String, String> {
+    final Invocations invocations;
+
+    public StaticPackagePrivateDoFn(Invocations invocations) {
+      this.invocations = invocations;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  class InnerPackagePrivateDoFn extends DoFn<String, String> {
+    final Invocations invocations;
+
+    public InnerPackagePrivateDoFn(Invocations invocations) {
+      this.invocations = invocations;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      invocations.wasProcessElementInvoked = true;
+    }
+  }
+
+  public static DoFn<String, String> newStaticPackagePrivateDoFn(
+      Invocations invocations) {
+    return new StaticPackagePrivateDoFn(invocations);
+  }
+
+  public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
+    return new InnerPackagePrivateDoFn(invocations);
+  }
+
+  public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
+    return new StaticPrivateDoFn(invocations);
+  }
+
+  public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
+    return new InnerPrivateDoFn(invocations);
+  }
+
+  public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
+    return new DoFn<String, String>() {
+      @ProcessElement
+      public void process(ProcessContext c) {
+        invocations.wasProcessElementInvoked = true;
+      }
+    };
+  }
+
+  public static DoFn<String, String> newStaticAnonymousDoFn(
+      final Invocations invocations) {
+    return new DoFn<String, String>() {
+      @ProcessElement
+      public void process(ProcessContext c) {
+        invocations.wasProcessElementInvoked = true;
+      }
+    };
+  }
+}