You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/09/12 17:58:34 UTC

[2/3] incubator-beam git commit: More changes to DoFn{Signatures, Invokers}

More changes to DoFn{Signatures,Invokers}

In preparation for Splittable DoFn:
* More generic code generation in DoFnInvokers:
  supports methods with return values (thanks @bjchambers).
* Uses AutoValue builder in DoFnSignature.
* Contextual error reporting in DoFnSignatures parsing code.
* Rewrote DoFnInvokers tests to use Mockito.
* Changed DoFnSignatures tests to use local classes
  and an "AnonymousMethod" class for testing analysis of
  single methods.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75c8bb8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75c8bb8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75c8bb8d

Branch: refs/heads/master
Commit: 75c8bb8dd3127b4dce63e8359ae27185f246b28c
Parents: 05c6c27
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 11 17:13:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Sep 12 10:08:57 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/DoFnAdapters.java       |   2 +-
 .../sdk/transforms/reflect/DoFnInvoker.java     |  20 +-
 .../sdk/transforms/reflect/DoFnInvokers.java    | 408 +++++++++--------
 .../sdk/transforms/reflect/DoFnSignature.java   |  71 ++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 245 +++++-----
 .../beam/sdk/util/common/ReflectHelpers.java    |  22 -
 .../transforms/reflect/DoFnInvokersTest.java    | 455 ++++++-------------
 .../reflect/DoFnInvokersTestHelper.java         | 116 -----
 .../DoFnSignaturesProcessElementTest.java       | 213 +++++++++
 .../transforms/reflect/DoFnSignaturesTest.java  | 269 +----------
 .../reflect/DoFnSignaturesTestUtils.java        |  64 +++
 .../testhelper/DoFnInvokersTestHelper.java      | 124 +++++
 12 files changed, 968 insertions(+), 1041 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 4803d77..77a71e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -86,7 +86,7 @@ public class DoFnAdapters {
 
     @Override
     public void startBundle(Context c) throws Exception {
-      this.fn.prepareForProcessing();
+      fn.prepareForProcessing();
       invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 5818a59..9de6759 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -26,28 +26,16 @@ import org.apache.beam.sdk.transforms.DoFn;
  * referred to as the bound {@link DoFn}.
  */
 public interface DoFnInvoker<InputT, OutputT> {
-  /**
-   * Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}.
-   */
+  /** Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}. */
   void invokeSetup();
 
-  /**
-   * Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}.
-   *
-   * @param c The {@link DoFn.Context} to invoke the fn with.
-   */
+  /** Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}. */
   void invokeStartBundle(DoFn<InputT, OutputT>.Context c);
 
-  /**
-   * Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}.
-   *
-   * @param c The {@link DoFn.Context} to invoke the fn with.
-   */
+  /** Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}. */
   void invokeFinishBundle(DoFn<InputT, OutputT>.Context c);
 
-  /**
-   * Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}.
-   */
+  /** Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}. */
   void invokeTeardown();
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 68e2ca9..f622015 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -18,10 +18,10 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumMap;
@@ -39,12 +39,15 @@ import net.bytebuddy.dynamic.DynamicType;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.FixedValue;
 import net.bytebuddy.implementation.Implementation;
-import net.bytebuddy.implementation.MethodCall;
-import net.bytebuddy.implementation.bind.MethodDelegationBinder;
+import net.bytebuddy.implementation.Implementation.Context;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder;
 import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
 import net.bytebuddy.implementation.bytecode.StackManipulation;
 import net.bytebuddy.implementation.bytecode.Throw;
+import net.bytebuddy.implementation.bytecode.assign.Assigner;
 import net.bytebuddy.implementation.bytecode.member.FieldAccess;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
 import net.bytebuddy.implementation.bytecode.member.MethodReturn;
@@ -52,13 +55,10 @@ import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
 import net.bytebuddy.jar.asm.Label;
 import net.bytebuddy.jar.asm.MethodVisitor;
 import net.bytebuddy.jar.asm.Opcodes;
+import net.bytebuddy.jar.asm.Type;
 import net.bytebuddy.matcher.ElementMatchers;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.StartBundle;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
 import org.apache.beam.sdk.util.UserCodeException;
 
 /** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
@@ -153,25 +153,13 @@ public class DoFnInvokers {
             .method(ElementMatchers.named("invokeProcessElement"))
             .intercept(new ProcessElementDelegation(signature.processElement()))
             .method(ElementMatchers.named("invokeStartBundle"))
-            .intercept(
-                signature.startBundle() == null
-                    ? new NoopMethodImplementation()
-                    : new BundleMethodDelegation(signature.startBundle()))
+            .intercept(delegateOrNoop(signature.startBundle()))
             .method(ElementMatchers.named("invokeFinishBundle"))
-            .intercept(
-                signature.finishBundle() == null
-                    ? new NoopMethodImplementation()
-                    : new BundleMethodDelegation(signature.finishBundle()))
+            .intercept(delegateOrNoop(signature.finishBundle()))
             .method(ElementMatchers.named("invokeSetup"))
-            .intercept(
-                signature.setup() == null
-                    ? new NoopMethodImplementation()
-                    : new LifecycleMethodDelegation(signature.setup()))
+            .intercept(delegateOrNoop(signature.setup()))
             .method(ElementMatchers.named("invokeTeardown"))
-            .intercept(
-                signature.teardown() == null
-                    ? new NoopMethodImplementation()
-                    : new LifecycleMethodDelegation(signature.teardown()));
+            .intercept(delegateOrNoop(signature.teardown()));
 
     DynamicType.Unloaded<?> unloaded = builder.make();
 
@@ -184,35 +172,29 @@ public class DoFnInvokers {
     return res;
   }
 
-  /** Implements an invoker method by doing nothing and immediately returning void. */
-  private static class NoopMethodImplementation implements Implementation {
-    @Override
-    public InstrumentedType prepare(InstrumentedType instrumentedType) {
-      return instrumentedType;
-    }
-
-    @Override
-    public ByteCodeAppender appender(final Target implementationTarget) {
-      return new ByteCodeAppender() {
-        @Override
-        public Size apply(
-            MethodVisitor methodVisitor,
-            Context implementationContext,
-            MethodDescription instrumentedMethod) {
-          StackManipulation manipulation = MethodReturn.VOID;
-          StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
-          return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
-        }
-      };
-    }
+  /** Delegates to the given method if available, or does nothing. */
+  private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) {
+    return (method == null)
+        ? FixedValue.originType()
+        : new DoFnMethodDelegation(method.targetMethod());
   }
 
   /**
-   * Base class for implementing an invoker method by delegating to a method of the target {@link
-   * DoFn}.
+   * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a
+   * "target method" of the wrapped {@link DoFn}.
    */
-  private abstract static class MethodDelegation implements Implementation {
-    FieldDescription delegateField;
+  private static class DoFnMethodDelegation implements Implementation {
+    /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */
+    protected final MethodDescription targetMethod;
+    /** Whether the target method returns non-void. */
+    private final boolean targetHasReturn;
+
+    private FieldDescription delegateField;
+
+    public DoFnMethodDelegation(Method targetMethod) {
+      this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod);
+      targetHasReturn = !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure());
+    }
 
     @Override
     public InstrumentedType prepare(InstrumentedType instrumentedType) {
@@ -222,7 +204,6 @@ public class DoFnInvokers {
               .getDeclaredFields()
               .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
               .getOnly();
-
       // Delegating the method call doesn't require any changes to the instrumented type.
       return instrumentedType;
     }
@@ -230,54 +211,102 @@ public class DoFnInvokers {
     @Override
     public ByteCodeAppender appender(final Target implementationTarget) {
       return new ByteCodeAppender() {
+        /**
+         * @param instrumentedMethod The {@link DoFnInvoker} method for which we're generating code.
+         */
         @Override
         public Size apply(
             MethodVisitor methodVisitor,
             Context implementationContext,
             MethodDescription instrumentedMethod) {
+          // Figure out how many locals we'll need. This corresponds to "this", the parameters
+          // of the instrumented method, and an argument to hold the return value if the target
+          // method has a return value.
+          int numLocals = 1 + instrumentedMethod.getParameters().size() + (targetHasReturn ? 1 : 0);
+
+          Integer returnVarIndex = null;
+          if (targetHasReturn) {
+            // Local comes after formal parameters, so figure out where that is.
+            returnVarIndex = 1; // "this"
+            for (Type param : Type.getArgumentTypes(instrumentedMethod.getDescriptor())) {
+              returnVarIndex += param.getSize();
+            }
+          }
+
           StackManipulation manipulation =
               new StackManipulation.Compound(
-                  // Push "this" reference to the stack
+                  // Push "this" (DoFnInvoker on top of the stack)
                   MethodVariableAccess.REFERENCE.loadOffset(0),
-                  // Access the delegate field of the the invoker
+                  // Access this.delegate (DoFn on top of the stack)
                   FieldAccess.forField(delegateField).getter(),
-                  invokeTargetMethod(instrumentedMethod));
+                  // Run the beforeDelegation manipulations.
+                  // The arguments necessary to invoke the target are on top of the stack.
+                  beforeDelegation(instrumentedMethod),
+                  // Perform the method delegation.
+                  // This will consume the arguments on top of the stack
+                  // Either the stack is now empty (because the targetMethod returns void) or the
+                  // stack contains the return value.
+                  new UserCodeMethodInvocation(returnVarIndex, targetMethod, instrumentedMethod),
+                  // Run the afterDelegation manipulations.
+                  // Either the stack is now empty (because the instrumentedMethod returns void)
+                  // or the stack contains the return value.
+                  afterDelegation(instrumentedMethod));
+
           StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
-          return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+          return new Size(size.getMaximalSize(), numLocals);
         }
       };
     }
 
     /**
-     * Generates code to invoke the target method. When this is called the delegate field will be on
-     * top of the stack. This should add any necessary arguments to the stack and then perform the
-     * method invocation.
+     * Return the code to the prepare the operand stack for the method delegation.
+     *
+     * <p>Before this method is called, the stack delegate will be the only thing on the stack.
+     *
+     * <p>After this method is called, the stack contents should contain exactly the arguments
+     * necessary to invoke the target method.
      */
-    protected abstract StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod);
+    protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+      return MethodVariableAccess.allArgumentsOf(targetMethod);
+    }
+
+    /**
+     * Return the code to execute after the method delegation.
+     *
+     * <p>Before this method is called, the stack will either be empty (if the target method returns
+     * void) or contain the method return value.
+     *
+     * <p>After this method is called, the stack should either be empty (if the instrumented method
+     * returns void) or contain the value for the instrumented method to return).
+     */
+    protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
+      return TargetMethodAnnotationDrivenBinder.TerminationHandler.Returning.INSTANCE.resolve(
+          Assigner.DEFAULT, instrumentedMethod, targetMethod);
+    }
   }
 
   /**
    * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
    * {@link DoFn.ProcessElement} method.
    */
-  private static final class ProcessElementDelegation extends MethodDelegation {
-    private static final Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription>
+  private static final class ProcessElementDelegation extends DoFnMethodDelegation {
+    private static final Map<DoFnSignature.Parameter, MethodDescription>
         EXTRA_CONTEXT_FACTORY_METHODS;
 
     static {
       try {
-        Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription> methods =
-            new EnumMap<>(DoFnSignature.ProcessElementMethod.Parameter.class);
+        Map<DoFnSignature.Parameter, MethodDescription> methods =
+            new EnumMap<>(DoFnSignature.Parameter.class);
         methods.put(
-            DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW,
+            DoFnSignature.Parameter.BOUNDED_WINDOW,
             new MethodDescription.ForLoadedMethod(
                 DoFn.ExtraContextFactory.class.getMethod("window")));
         methods.put(
-            DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER,
+            DoFnSignature.Parameter.INPUT_PROVIDER,
             new MethodDescription.ForLoadedMethod(
                 DoFn.ExtraContextFactory.class.getMethod("inputProvider")));
         methods.put(
-            DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER,
+            DoFnSignature.Parameter.OUTPUT_RECEIVER,
             new MethodDescription.ForLoadedMethod(
                 DoFn.ExtraContextFactory.class.getMethod("outputReceiver")));
         EXTRA_CONTEXT_FACTORY_METHODS = Collections.unmodifiableMap(methods);
@@ -291,16 +320,12 @@ public class DoFnInvokers {
 
     /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
     private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) {
+      super(signature.targetMethod());
       this.signature = signature;
     }
 
     @Override
-    protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
-      MethodDescription targetMethod =
-          new MethodCall.MethodLocator.ForExplicitMethod(
-                  new MethodDescription.ForLoadedMethod(signature.targetMethod()))
-              .resolve(instrumentedMethod);
-
+    protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
       // Parameters of the wrapper invoker method:
       //   DoFn.ProcessContext, ExtraContextFactory.
       // Parameters of the wrapped DoFn method:
@@ -310,144 +335,159 @@ public class DoFnInvokers {
       parameters.add(MethodVariableAccess.REFERENCE.loadOffset(1));
       // Push the extra arguments in their actual order.
       StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(2);
-      for (DoFnSignature.ProcessElementMethod.Parameter param : signature.extraParameters()) {
+      for (DoFnSignature.Parameter param : signature.extraParameters()) {
         parameters.add(
             new StackManipulation.Compound(
                 pushExtraContextFactory,
                 MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param))));
       }
+      return new StackManipulation.Compound(parameters);
+    }
 
-      return new StackManipulation.Compound(
-          // Push the parameters
-          new StackManipulation.Compound(parameters),
-          // Invoke the target method
-          wrapWithUserCodeException(
-              MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
-          // Return from the instrumented method
-          MethodReturn.VOID);
+    @Override
+    protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
+      return MethodReturn.VOID;
     }
   }
 
-  /**
-   * Implements {@link DoFnInvoker#invokeStartBundle} or {@link DoFnInvoker#invokeFinishBundle} by
-   * delegating respectively to the {@link StartBundle} and {@link FinishBundle} methods.
-   */
-  private static final class BundleMethodDelegation extends MethodDelegation {
-    private final DoFnSignature.BundleMethod signature;
+  private static class UserCodeMethodInvocation implements StackManipulation {
 
-    private BundleMethodDelegation(@Nullable DoFnSignature.BundleMethod signature) {
-      this.signature = signature;
+    @Nullable private final Integer returnVarIndex;
+    private final MethodDescription targetMethod;
+    private final MethodDescription instrumentedMethod;
+    private final TypeDescription returnType;
+
+    private final Label wrapStart = new Label();
+    private final Label wrapEnd = new Label();
+    private final Label tryBlockStart = new Label();
+    private final Label tryBlockEnd = new Label();
+    private final Label catchBlockStart = new Label();
+    private final Label catchBlockEnd = new Label();
+
+    private final MethodDescription createUserCodeException;
+
+    UserCodeMethodInvocation(
+        @Nullable Integer returnVarIndex,
+        MethodDescription targetMethod,
+        MethodDescription instrumentedMethod) {
+      this.returnVarIndex = returnVarIndex;
+      this.targetMethod = targetMethod;
+      this.instrumentedMethod = instrumentedMethod;
+      this.returnType = targetMethod.getReturnType().asErasure();
+
+      boolean targetMethodReturnsVoid = TypeDescription.VOID.equals(returnType);
+      checkArgument(
+          (returnVarIndex == null) == targetMethodReturnsVoid,
+          "returnVarIndex should be defined if and only if the target method has a return value");
+
+      try {
+        createUserCodeException =
+            new MethodDescription.ForLoadedMethod(
+                UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
+      } catch (NoSuchMethodException | SecurityException e) {
+        throw new RuntimeException("Unable to find UserCodeException.wrap", e);
+      }
     }
 
     @Override
-    protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
-      MethodDescription targetMethod =
-          new MethodCall.MethodLocator.ForExplicitMethod(
-                  new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
-              .resolve(instrumentedMethod);
-      return new StackManipulation.Compound(
-          // Push the parameters
-          MethodVariableAccess.REFERENCE.loadOffset(1),
-          // Invoke the target method
-          wrapWithUserCodeException(
-              MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
-          MethodReturn.VOID);
+    public boolean isValid() {
+      return true;
     }
-  }
 
-  /**
-   * Implements {@link DoFnInvoker#invokeSetup} or {@link DoFnInvoker#invokeTeardown} by delegating
-   * respectively to the {@link Setup} and {@link Teardown} methods.
-   */
-  private static final class LifecycleMethodDelegation extends MethodDelegation {
-    private final DoFnSignature.LifecycleMethod signature;
+    private Object describeType(Type type) {
+      switch (type.getSort()) {
+        case Type.OBJECT:
+          return type.getDescriptor();
+        case Type.INT:
+        case Type.BYTE:
+        case Type.BOOLEAN:
+        case Type.SHORT:
+          return Opcodes.INTEGER;
+        case Type.LONG:
+          return Opcodes.LONG;
+        case Type.DOUBLE:
+          return Opcodes.DOUBLE;
+        case Type.FLOAT:
+          return Opcodes.FLOAT;
+        default:
+          throw new IllegalArgumentException("Unhandled type as method argument: " + type);
+      }
+    }
 
-    private LifecycleMethodDelegation(@Nullable DoFnSignature.LifecycleMethod signature) {
-      this.signature = signature;
+    private void visitFrame(
+        MethodVisitor mv, boolean localsIncludeReturn, @Nullable String stackTop) {
+      boolean hasReturnLocal = (returnVarIndex != null) && localsIncludeReturn;
+
+      Type[] localTypes = Type.getArgumentTypes(instrumentedMethod.getDescriptor());
+      Object[] locals = new Object[1 + localTypes.length + (hasReturnLocal ? 1 : 0)];
+      locals[0] = instrumentedMethod.getReceiverType().asErasure().getInternalName();
+      for (int i = 0; i < localTypes.length; i++) {
+        locals[i + 1] = describeType(localTypes[i]);
+      }
+      if (hasReturnLocal) {
+        locals[locals.length - 1] = returnType.getInternalName();
+      }
+
+      Object[] stack = stackTop == null ? new Object[] {} : new Object[] {stackTop};
+
+      mv.visitFrame(Opcodes.F_NEW, locals.length, locals, stack.length, stack);
     }
 
     @Override
-    protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
-      MethodDescription targetMethod =
-          new MethodCall.MethodLocator.ForExplicitMethod(
-                  new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
-              .resolve(instrumentedMethod);
-      return new StackManipulation.Compound(
-          wrapWithUserCodeException(
-              MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
-          MethodReturn.VOID);
-    }
-  }
+    public Size apply(MethodVisitor mv, Context context) {
+      Size size = new Size(0, 0);
 
-  /**
-   * Wraps a given stack manipulation in a try catch block. Any exceptions thrown within the try are
-   * wrapped with a {@link UserCodeException}.
-   */
-  private static StackManipulation wrapWithUserCodeException(final StackManipulation tryBody) {
-    final MethodDescription createUserCodeException;
-    try {
-      createUserCodeException =
-          new MethodDescription.ForLoadedMethod(
-              UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
-    } catch (NoSuchMethodException | SecurityException e) {
-      throw new RuntimeException("Unable to find UserCodeException.wrap", e);
-    }
+      mv.visitLabel(wrapStart);
+
+      String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
+      mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
 
-    return new StackManipulation() {
-      @Override
-      public boolean isValid() {
-        return tryBody.isValid();
+      // The try block attempts to perform the expected operations, then jumps to success
+      mv.visitLabel(tryBlockStart);
+      size = size.aggregate(MethodInvocation.invoke(targetMethod).apply(mv, context));
+
+      if (returnVarIndex != null) {
+        mv.visitVarInsn(Opcodes.ASTORE, returnVarIndex);
+        size = size.aggregate(new Size(-1, 0)); // Reduces the size of the stack
       }
+      mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
+      mv.visitLabel(tryBlockEnd);
+
+      // The handler wraps the exception, and then throws.
+      mv.visitLabel(catchBlockStart);
+      // In catch block, should have same locals and {Throwable} on the stack.
+      visitFrame(mv, false, throwableName);
+
+      // Create the user code exception and throw
+      size =
+          size.aggregate(
+              new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
+                  .apply(mv, context));
+
+      mv.visitLabel(catchBlockEnd);
 
-      @Override
-      public Size apply(MethodVisitor mv, Implementation.Context implementationContext) {
-        Label tryBlockStart = new Label();
-        Label tryBlockEnd = new Label();
-        Label catchBlockStart = new Label();
-        Label catchBlockEnd = new Label();
-
-        String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
-        mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
-
-        // The try block attempts to perform the expected operations, then jumps to success
-        mv.visitLabel(tryBlockStart);
-        Size trySize = tryBody.apply(mv, implementationContext);
-        mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
-        mv.visitLabel(tryBlockEnd);
-
-        // The handler wraps the exception, and then throws.
-        mv.visitLabel(catchBlockStart);
-        // Add the exception to the frame
-        mv.visitFrame(
-            Opcodes.F_SAME1,
-            // No local variables
-            0,
-            new Object[] {},
-            // 1 stack element (the throwable)
-            1,
-            new Object[] {throwableName});
-
-        Size catchSize =
-            new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
-                .apply(mv, implementationContext);
-
-        mv.visitLabel(catchBlockEnd);
-        // The frame contents after the try/catch block is the same
-        // as it was before.
-        mv.visitFrame(
-            Opcodes.F_SAME,
-            // No local variables
-            0,
-            new Object[] {},
-            // No new stack variables
-            0,
-            new Object[] {});
-
-        return new Size(
-            trySize.getSizeImpact(),
-            Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize()));
+      // After the catch block we should have the return in scope, but nothing on the stack.
+      visitFrame(mv, true, null);
+
+      // After catch block, should have same locals and will have the return on the stack.
+      if (returnVarIndex != null) {
+        mv.visitVarInsn(Opcodes.ALOAD, returnVarIndex);
+        size = size.aggregate(new Size(1, 0)); // Increases the size of the stack
+      }
+      mv.visitLabel(wrapEnd);
+      if (returnVarIndex != null) {
+        // Drop the return type from the locals
+        mv.visitLocalVariable(
+            "res",
+            returnType.getDescriptor(),
+            returnType.getGenericSignature(),
+            wrapStart,
+            wrapEnd,
+            returnVarIndex);
       }
-    };
+
+      return size;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 181c088..b6864da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -32,57 +32,74 @@ import org.apache.beam.sdk.transforms.DoFn;
  */
 @AutoValue
 public abstract class DoFnSignature {
+  /** Class of the original {@link DoFn} from which this signature was produced. */
   public abstract Class<? extends DoFn> fnClass();
 
+  /** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */
   public abstract ProcessElementMethod processElement();
 
+  /** Details about this {@link DoFn}'s {@link DoFn.StartBundle} method. */
   @Nullable
   public abstract BundleMethod startBundle();
 
+  /** Details about this {@link DoFn}'s {@link DoFn.FinishBundle} method. */
   @Nullable
   public abstract BundleMethod finishBundle();
 
+  /** Details about this {@link DoFn}'s {@link DoFn.Setup} method. */
   @Nullable
   public abstract LifecycleMethod setup();
 
+  /** Details about this {@link DoFn}'s {@link DoFn.Teardown} method. */
   @Nullable
   public abstract LifecycleMethod teardown();
 
-  static DoFnSignature create(
-      Class<? extends DoFn> fnClass,
-      ProcessElementMethod processElement,
-      @Nullable BundleMethod startBundle,
-      @Nullable BundleMethod finishBundle,
-      @Nullable LifecycleMethod setup,
-      @Nullable LifecycleMethod teardown) {
-    return new AutoValue_DoFnSignature(
-        fnClass,
-        processElement,
-        startBundle,
-        finishBundle,
-        setup,
-        teardown);
+  static Builder builder() {
+    return new AutoValue_DoFnSignature.Builder();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract Builder setFnClass(Class<? extends DoFn> fnClass);
+    abstract Builder setProcessElement(ProcessElementMethod processElement);
+    abstract Builder setStartBundle(BundleMethod startBundle);
+    abstract Builder setFinishBundle(BundleMethod finishBundle);
+    abstract Builder setSetup(LifecycleMethod setup);
+    abstract Builder setTeardown(LifecycleMethod teardown);
+    abstract DoFnSignature build();
+  }
+
+  /** A method delegated to a annotated method of an underlying {@link DoFn}. */
+  public interface DoFnMethod {
+    /** The annotated method itself. */
+    Method targetMethod();
+  }
+
+  /** A type of optional parameter of the {@link DoFn.ProcessElement} method. */
+  public enum Parameter {
+    BOUNDED_WINDOW,
+    INPUT_PROVIDER,
+    OUTPUT_RECEIVER,
   }
 
   /** Describes a {@link DoFn.ProcessElement} method. */
   @AutoValue
-  public abstract static class ProcessElementMethod {
-    enum Parameter {
-      BOUNDED_WINDOW,
-      INPUT_PROVIDER,
-      OUTPUT_RECEIVER
-    }
-
+  public abstract static class ProcessElementMethod implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
     public abstract Method targetMethod();
 
+    /** Types of optional parameters of the annotated method, in the order they appear. */
     public abstract List<Parameter> extraParameters();
 
-    static ProcessElementMethod create(Method targetMethod, List<Parameter> extraParameters) {
+    static ProcessElementMethod create(
+        Method targetMethod,
+        List<Parameter> extraParameters) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
           targetMethod, Collections.unmodifiableList(extraParameters));
     }
 
-    /** @return true if the reflected {@link DoFn} uses a Single Window. */
+    /** Whether this {@link DoFn} uses a Single Window. */
     public boolean usesSingleWindow() {
       return extraParameters().contains(Parameter.BOUNDED_WINDOW);
     }
@@ -90,7 +107,9 @@ public abstract class DoFnSignature {
 
   /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
   @AutoValue
-  public abstract static class BundleMethod {
+  public abstract static class BundleMethod implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
     public abstract Method targetMethod();
 
     static BundleMethod create(Method targetMethod) {
@@ -100,7 +119,9 @@ public abstract class DoFnSignature {
 
   /** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */
   @AutoValue
-  public abstract static class LifecycleMethod {
+  public abstract static class LifecycleMethod implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
     public abstract Method targetMethod();
 
     static LifecycleMethod create(Method targetMethod) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 7e482d5..8283788 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.reflect.TypeParameter;
 import com.google.common.reflect.TypeToken;
@@ -36,6 +33,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
@@ -62,15 +60,17 @@ public class DoFnSignatures {
 
   /** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */
   private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) {
-    TypeToken<?> inputT = null;
-    TypeToken<?> outputT = null;
+    DoFnSignature.Builder builder = DoFnSignature.builder();
+
+    ErrorReporter errors = new ErrorReporter(null, fnClass.getName());
+    errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn");
+    builder.setFnClass(fnClass);
 
-    // Extract the input and output type.
-    checkArgument(
-        DoFn.class.isAssignableFrom(fnClass),
-        "%s must be subtype of DoFn",
-        fnClass.getSimpleName());
     TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass);
+
+    // Extract the input and output type, and whether the fn is bounded.
+    TypeToken<?> inputT = null;
+    TypeToken<?> outputT = null;
     for (TypeToken<?> supertype : fnToken.getTypes()) {
       if (!supertype.getRawType().equals(DoFn.class)) {
         continue;
@@ -79,25 +79,48 @@ public class DoFnSignatures {
       inputT = TypeToken.of(args[0]);
       outputT = TypeToken.of(args[1]);
     }
-    checkNotNull(inputT, "Unable to determine input type from %s", fnClass);
-
-    Method processElementMethod = findAnnotatedMethod(DoFn.ProcessElement.class, fnClass, true);
-    Method startBundleMethod = findAnnotatedMethod(DoFn.StartBundle.class, fnClass, false);
-    Method finishBundleMethod = findAnnotatedMethod(DoFn.FinishBundle.class, fnClass, false);
-    Method setupMethod = findAnnotatedMethod(DoFn.Setup.class, fnClass, false);
-    Method teardownMethod = findAnnotatedMethod(DoFn.Teardown.class, fnClass, false);
-
-    return DoFnSignature.create(
-        fnClass,
-        analyzeProcessElementMethod(fnToken, processElementMethod, inputT, outputT),
-        (startBundleMethod == null)
-            ? null
-            : analyzeBundleMethod(fnToken, startBundleMethod, inputT, outputT),
-        (finishBundleMethod == null)
-            ? null
-            : analyzeBundleMethod(fnToken, finishBundleMethod, inputT, outputT),
-        (setupMethod == null) ? null : analyzeLifecycleMethod(setupMethod),
-        (teardownMethod == null) ? null : analyzeLifecycleMethod(teardownMethod));
+    errors.checkNotNull(inputT, "Unable to determine input type");
+
+    Method processElementMethod =
+        findAnnotatedMethod(errors, DoFn.ProcessElement.class, fnClass, true);
+    Method startBundleMethod = findAnnotatedMethod(errors, DoFn.StartBundle.class, fnClass, false);
+    Method finishBundleMethod =
+        findAnnotatedMethod(errors, DoFn.FinishBundle.class, fnClass, false);
+    Method setupMethod = findAnnotatedMethod(errors, DoFn.Setup.class, fnClass, false);
+    Method teardownMethod = findAnnotatedMethod(errors, DoFn.Teardown.class, fnClass, false);
+
+    ErrorReporter processElementErrors =
+        errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
+    DoFnSignature.ProcessElementMethod processElement =
+        analyzeProcessElementMethod(
+            processElementErrors, fnToken, processElementMethod, inputT, outputT);
+    builder.setProcessElement(processElement);
+
+    if (startBundleMethod != null) {
+      ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod);
+      builder.setStartBundle(
+          analyzeBundleMethod(startBundleErrors, fnToken, startBundleMethod, inputT, outputT));
+    }
+
+    if (finishBundleMethod != null) {
+      ErrorReporter finishBundleErrors =
+          errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod);
+      builder.setFinishBundle(
+          analyzeBundleMethod(finishBundleErrors, fnToken, finishBundleMethod, inputT, outputT));
+    }
+
+    if (setupMethod != null) {
+      builder.setSetup(
+          analyzeLifecycleMethod(errors.forMethod(DoFn.Setup.class, setupMethod), setupMethod));
+    }
+
+    if (teardownMethod != null) {
+      builder.setTeardown(
+          analyzeLifecycleMethod(
+              errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
+    }
+
+    return builder.build();
   }
 
   /**
@@ -139,10 +162,12 @@ public class DoFnSignatures {
 
   @VisibleForTesting
   static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
-      TypeToken<? extends DoFn> fnClass, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
-    checkArgument(
-        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
-    checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+      ErrorReporter errors,
+      TypeToken<? extends DoFn> fnClass,
+      Method m,
+      TypeToken<?> inputT,
+      TypeToken<?> outputT) {
+    errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
 
     TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
 
@@ -151,57 +176,49 @@ public class DoFnSignatures {
     if (params.length > 0) {
       contextToken = fnClass.resolveType(params[0]);
     }
-    checkArgument(
+    errors.checkArgument(
         contextToken != null && contextToken.equals(processContextToken),
-        "%s must take a %s as its first argument",
-        format(m),
+        "Must take %s as the first argument",
         formatType(processContextToken));
 
-    List<DoFnSignature.ProcessElementMethod.Parameter> extraParameters = new ArrayList<>();
+    List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
+
     TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
     TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
     for (int i = 1; i < params.length; ++i) {
-      TypeToken<?> param = fnClass.resolveType(params[i]);
-      Class<?> rawType = param.getRawType();
+      TypeToken<?> paramT = fnClass.resolveType(params[i]);
+      Class<?> rawType = paramT.getRawType();
       if (rawType.equals(BoundedWindow.class)) {
-        checkArgument(
-            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW),
-            "Multiple BoundedWindow parameters in %s",
-            format(m));
-        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW);
+        errors.checkArgument(
+            !extraParameters.contains(DoFnSignature.Parameter.BOUNDED_WINDOW),
+            "Multiple BoundedWindow parameters");
+        extraParameters.add(DoFnSignature.Parameter.BOUNDED_WINDOW);
       } else if (rawType.equals(DoFn.InputProvider.class)) {
-        checkArgument(
-            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER),
-            "Multiple InputProvider parameters in %s",
-            format(m));
-        checkArgument(
-            param.equals(expectedInputProviderT),
-            "Wrong type of InputProvider parameter for method %s: %s, should be %s",
-            format(m),
-            formatType(param),
+        errors.checkArgument(
+            !extraParameters.contains(DoFnSignature.Parameter.INPUT_PROVIDER),
+            "Multiple InputProvider parameters");
+        errors.checkArgument(
+            paramT.equals(expectedInputProviderT),
+            "Wrong type of InputProvider parameter: %s, should be %s",
+            formatType(paramT),
             formatType(expectedInputProviderT));
-        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER);
+        extraParameters.add(DoFnSignature.Parameter.INPUT_PROVIDER);
       } else if (rawType.equals(DoFn.OutputReceiver.class)) {
-        checkArgument(
-            !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER),
-            "Multiple OutputReceiver parameters in %s",
-            format(m));
-        checkArgument(
-            param.equals(expectedOutputReceiverT),
-            "Wrong type of OutputReceiver parameter for method %s: %s, should be %s",
-            format(m),
-            formatType(param),
+        errors.checkArgument(
+            !extraParameters.contains(DoFnSignature.Parameter.OUTPUT_RECEIVER),
+            "Multiple OutputReceiver parameters");
+        errors.checkArgument(
+            paramT.equals(expectedOutputReceiverT),
+            "Wrong type of OutputReceiver parameter: %s, should be %s",
+            formatType(paramT),
             formatType(expectedOutputReceiverT));
-        extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER);
+        extraParameters.add(DoFnSignature.Parameter.OUTPUT_RECEIVER);
       } else {
         List<String> allowedParamTypes =
             Arrays.asList(formatType(new TypeToken<BoundedWindow>() {}));
-        checkArgument(
-            false,
-            "%s is not a valid context parameter for method %s. Should be one of %s",
-            formatType(param),
-            format(m),
-            allowedParamTypes);
+        errors.throwIllegalArgument(
+            "%s is not a valid context parameter. Should be one of %s",
+            formatType(paramT), allowedParamTypes);
       }
     }
 
@@ -210,35 +227,25 @@ public class DoFnSignatures {
 
   @VisibleForTesting
   static DoFnSignature.BundleMethod analyzeBundleMethod(
-      TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
-    checkArgument(
-        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
-    checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
-
+      ErrorReporter errors,
+      TypeToken<? extends DoFn> fnToken,
+      Method m,
+      TypeToken<?> inputT,
+      TypeToken<?> outputT) {
+    errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
     TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT);
-
     Type[] params = m.getGenericParameterTypes();
-    checkArgument(
-        params.length == 1,
-        "%s must have a single argument of type %s",
-        format(m),
+    errors.checkArgument(
+        params.length == 1 && fnToken.resolveType(params[0]).equals(expectedContextToken),
+        "Must take a single argument of type %s",
         formatType(expectedContextToken));
-    TypeToken<?> contextToken = fnToken.resolveType(params[0]);
-    checkArgument(
-        contextToken.equals(expectedContextToken),
-        "Wrong type of context argument to %s: %s, must be %s",
-        format(m),
-        formatType(contextToken),
-        formatType(expectedContextToken));
-
     return DoFnSignature.BundleMethod.create(m);
   }
 
-  private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(Method m) {
-    checkArgument(
-        void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
-    checkArgument(
-        m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
+  private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(
+      ErrorReporter errors, Method m) {
+    errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
+    errors.checkArgument(m.getGenericParameterTypes().length == 0, "Must take zero arguments");
     return DoFnSignature.LifecycleMethod.create(m);
   }
 
@@ -272,15 +279,11 @@ public class DoFnSignatures {
   }
 
   private static Method findAnnotatedMethod(
-      Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
+      ErrorReporter errors, Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
     Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class);
 
     if (matches.size() == 0) {
-      checkArgument(
-          !required,
-          "No method annotated with @%s found in %s",
-          anno.getSimpleName(),
-          fnClazz.getName());
+      errors.checkArgument(!required, "No method annotated with @%s found", anno.getSimpleName());
       return null;
     }
 
@@ -289,7 +292,7 @@ public class DoFnSignatures {
     // classes).
     Method first = matches.iterator().next();
     for (Method other : matches) {
-      checkArgument(
+      errors.checkArgument(
           first.getName().equals(other.getName())
               && Arrays.equals(first.getParameterTypes(), other.getParameterTypes()),
           "Found multiple methods annotated with @%s. [%s] and [%s]",
@@ -298,22 +301,50 @@ public class DoFnSignatures {
           format(other));
     }
 
+    ErrorReporter methodErrors = errors.forMethod(anno, first);
     // We need to be able to call it. We require it is public.
-    checkArgument(
-        (first.getModifiers() & Modifier.PUBLIC) != 0, "%s must be public", format(first));
-
+    methodErrors.checkArgument((first.getModifiers() & Modifier.PUBLIC) != 0, "Must be public");
     // And make sure its not static.
-    checkArgument(
-        (first.getModifiers() & Modifier.STATIC) == 0, "%s must not be static", format(first));
+    methodErrors.checkArgument((first.getModifiers() & Modifier.STATIC) == 0, "Must not be static");
 
     return first;
   }
 
-  private static String format(Method m) {
-    return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
+  private static String format(Method method) {
+    return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method);
   }
 
   private static String formatType(TypeToken<?> t) {
     return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
   }
+
+  static class ErrorReporter {
+    private final String label;
+
+    ErrorReporter(@Nullable ErrorReporter root, String label) {
+      this.label = (root == null) ? label : String.format("%s, %s", root.label, label);
+    }
+
+    ErrorReporter forMethod(Class<? extends Annotation> annotation, Method method) {
+      return new ErrorReporter(
+          this,
+          String.format("@%s %s", annotation, (method == null) ? "(absent)" : format(method)));
+    }
+
+    void throwIllegalArgument(String message, Object... args) {
+      throw new IllegalArgumentException(label + ": " + String.format(message, args));
+    }
+
+    public void checkArgument(boolean condition, String message, Object... args) {
+      if (!condition) {
+        throwIllegalArgument(message, args);
+      }
+    }
+
+    public void checkNotNull(Object value, String message, Object... args) {
+      if (value == null) {
+        throwIllegalArgument(message, args);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 2034eba..2d92162 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -33,8 +33,6 @@ import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.lang.reflect.WildcardType;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashSet;
 import java.util.Queue;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -153,26 +151,6 @@ public class ReflectHelpers {
   };
 
   /**
-   * Returns all interfaces of the given clazz.
-   * @param clazz
-   * @return
-   */
-  public static FluentIterable<Class<?>> getClosureOfInterfaces(Class<?> clazz) {
-    checkNotNull(clazz);
-    Queue<Class<?>> interfacesToProcess = Queues.newArrayDeque();
-    Collections.addAll(interfacesToProcess, clazz.getInterfaces());
-
-    LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
-    while (!interfacesToProcess.isEmpty()) {
-      Class<?> current = interfacesToProcess.remove();
-      if (interfaces.add(current)) {
-        Collections.addAll(interfacesToProcess, current.getInterfaces());
-      }
-    }
-    return FluentIterable.from(interfaces);
-  }
-
-  /**
    * Returns all the methods visible from the provided interfaces.
    *
    * @param interfaces The interfaces to use when searching for all their methods.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 9317ea2..e59cce8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.junit.Before;
@@ -36,20 +37,6 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link DoFnInvokers}. */
 @RunWith(JUnit4.class)
 public class DoFnInvokersTest {
-  /** A convenience struct holding flags that indicate whether a particular method was invoked. */
-  public static class Invocations {
-    public boolean wasProcessElementInvoked = false;
-    public boolean wasStartBundleInvoked = false;
-    public boolean wasFinishBundleInvoked = false;
-    public boolean wasSetupInvoked = false;
-    public boolean wasTeardownInvoked = false;
-    private final String name;
-
-    public Invocations(String name) {
-      this.name = name;
-    }
-  }
-
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Mock private DoFn.ProcessContext mockContext;
@@ -81,102 +68,10 @@ public class DoFnInvokersTest {
         };
   }
 
-  private void checkInvokeProcessElementWorks(DoFn<String, String> fn, Invocations... invocations)
-      throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse(
-          "Should not yet have called processElement on " + invocation.name,
-          invocation.wasProcessElementInvoked);
-    }
+  private void invokeProcessElement(DoFn<String, String> fn) {
     DoFnInvokers.INSTANCE
         .newByteBuddyInvoker(fn)
         .invokeProcessElement(mockContext, extraContextFactory);
-    for (Invocations invocation : invocations) {
-      assertTrue(
-          "Should have called processElement on " + invocation.name,
-          invocation.wasProcessElementInvoked);
-    }
-  }
-
-  private void checkInvokeStartBundleWorks(DoFn<String, String> fn, Invocations... invocations)
-      throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse(
-          "Should not yet have called startBundle on " + invocation.name,
-          invocation.wasStartBundleInvoked);
-    }
-    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(mockContext);
-    for (Invocations invocation : invocations) {
-      assertTrue(
-          "Should have called startBundle on " + invocation.name, invocation.wasStartBundleInvoked);
-    }
-  }
-
-  private void checkInvokeFinishBundleWorks(DoFn<String, String> fn, Invocations... invocations)
-      throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse(
-          "Should not yet have called finishBundle on " + invocation.name,
-          invocation.wasFinishBundleInvoked);
-    }
-    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(mockContext);
-    for (Invocations invocation : invocations) {
-      assertTrue(
-          "Should have called finishBundle on " + invocation.name,
-          invocation.wasFinishBundleInvoked);
-    }
-  }
-
-  private void checkInvokeSetupWorks(DoFn<String, String> fn, Invocations... invocations)
-      throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse(
-          "Should not yet have called setup on " + invocation.name, invocation.wasSetupInvoked);
-    }
-    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeSetup();
-    for (Invocations invocation : invocations) {
-      assertTrue("Should have called setup on " + invocation.name, invocation.wasSetupInvoked);
-    }
-  }
-
-  private void checkInvokeTeardownWorks(DoFn<String, String> fn, Invocations... invocations)
-      throws Exception {
-    assertTrue("Need at least one invocation to check", invocations.length >= 1);
-    for (Invocations invocation : invocations) {
-      assertFalse(
-          "Should not yet have called teardown on " + invocation.name,
-          invocation.wasTeardownInvoked);
-    }
-    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeTeardown();
-    for (Invocations invocation : invocations) {
-      assertTrue(
-          "Should have called teardown on " + invocation.name, invocation.wasTeardownInvoked);
-    }
-  }
-
-  @Test
-  public void testDoFnWithNoExtraContext() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFn<String, String> fn =
-        new DoFn<String, String>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) throws Exception {
-            invocations.wasProcessElementInvoked = true;
-            assertSame(c, mockContext);
-          }
-        };
-
-    assertFalse(
-        DoFnSignatures.INSTANCE
-            .getOrParseSignature(fn.getClass())
-            .processElement()
-            .usesSingleWindow());
-
-    checkInvokeProcessElementWorks(fn, invocations);
   }
 
   @Test
@@ -190,6 +85,21 @@ public class DoFnInvokersTest {
         DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass());
   }
 
+  // ---------------------------------------------------------------------------------------
+  // Tests for general invocations of DoFn methods.
+  // ---------------------------------------------------------------------------------------
+
+  @Test
+  public void testDoFnWithNoExtraContext() throws Exception {
+    class MockFn extends DoFn<String, String> {
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {}
+    }
+    MockFn fn = mock(MockFn.class);
+    invokeProcessElement(fn);
+    verify(fn).processElement(mockContext);
+  }
+
   interface InterfaceWithProcessElement {
     @DoFn.ProcessElement
     void processElement(DoFn<String, String>.ProcessContext c);
@@ -199,302 +109,221 @@ public class DoFnInvokersTest {
 
   private class IdentityUsingInterfaceWithProcessElement extends DoFn<String, String>
       implements LayersOfInterfaces {
-
-    private Invocations invocations = new Invocations("Named Class");
-
     @Override
-    public void processElement(DoFn<String, String>.ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-      assertSame(c, mockContext);
-    }
+    public void processElement(DoFn<String, String>.ProcessContext c) {}
   }
 
   @Test
   public void testDoFnWithProcessElementInterface() throws Exception {
-    IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
-    assertFalse(
-        DoFnSignatures.INSTANCE
-            .getOrParseSignature(fn.getClass())
-            .processElement()
-            .usesSingleWindow());
-    checkInvokeProcessElementWorks(fn, fn.invocations);
+    IdentityUsingInterfaceWithProcessElement fn =
+        mock(IdentityUsingInterfaceWithProcessElement.class);
+    invokeProcessElement(fn);
+    verify(fn).processElement(mockContext);
   }
 
   private class IdentityParent extends DoFn<String, String> {
-    protected Invocations parentInvocations = new Invocations("IdentityParent");
-
     @ProcessElement
-    public void process(ProcessContext c) {
-      parentInvocations.wasProcessElementInvoked = true;
-      assertSame(c, mockContext);
-    }
+    public void process(ProcessContext c) {}
   }
 
   private class IdentityChildWithoutOverride extends IdentityParent {}
 
   private class IdentityChildWithOverride extends IdentityParent {
-    protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
-
     @Override
     public void process(DoFn<String, String>.ProcessContext c) {
       super.process(c);
-      childInvocations.wasProcessElementInvoked = true;
     }
   }
 
   @Test
   public void testDoFnWithMethodInSuperclass() throws Exception {
-    IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
-    assertFalse(
-        DoFnSignatures.INSTANCE
-            .getOrParseSignature(fn.getClass())
-            .processElement()
-            .usesSingleWindow());
-    checkInvokeProcessElementWorks(fn, fn.parentInvocations);
+    IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
+    invokeProcessElement(fn);
+    verify(fn).process(mockContext);
   }
 
   @Test
   public void testDoFnWithMethodInSubclass() throws Exception {
-    IdentityChildWithOverride fn = new IdentityChildWithOverride();
-    assertFalse(
-        DoFnSignatures.INSTANCE
-            .getOrParseSignature(fn.getClass())
-            .processElement()
-            .usesSingleWindow());
-    checkInvokeProcessElementWorks(fn, fn.parentInvocations, fn.childInvocations);
+    IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
+    invokeProcessElement(fn);
+    verify(fn).process(mockContext);
   }
 
   @Test
   public void testDoFnWithWindow() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFn<String, String> fn =
-        new DoFn<String, String>() {
-          @ProcessElement
-          public void processElement(ProcessContext c, BoundedWindow w) throws Exception {
-            invocations.wasProcessElementInvoked = true;
-            assertSame(c, mockContext);
-            assertSame(w, mockWindow);
-          }
-        };
-
-    assertTrue(
-        DoFnSignatures.INSTANCE
-            .getOrParseSignature(fn.getClass())
-            .processElement()
-            .usesSingleWindow());
-
-    checkInvokeProcessElementWorks(fn, invocations);
+    class MockFn extends DoFn<String, String> {
+      @DoFn.ProcessElement
+      public void processElement(ProcessContext c, BoundedWindow w) throws Exception {}
+    }
+    MockFn fn = mock(MockFn.class);
+    invokeProcessElement(fn);
+    verify(fn).processElement(mockContext, mockWindow);
   }
 
   @Test
   public void testDoFnWithOutputReceiver() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFn<String, String> fn =
-        new DoFn<String, String>() {
-          @ProcessElement
-          public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {
-            invocations.wasProcessElementInvoked = true;
-            assertSame(c, mockContext);
-            assertSame(o, mockOutputReceiver);
-          }
-        };
-
-    assertFalse(
-        DoFnSignatures.INSTANCE
-            .getOrParseSignature(fn.getClass())
-            .processElement()
-            .usesSingleWindow());
-
-    checkInvokeProcessElementWorks(fn, invocations);
+    class MockFn extends DoFn<String, String> {
+      @DoFn.ProcessElement
+      public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {}
+    }
+    MockFn fn = mock(MockFn.class);
+    invokeProcessElement(fn);
+    verify(fn).processElement(mockContext, mockOutputReceiver);
   }
 
   @Test
   public void testDoFnWithInputProvider() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFn<String, String> fn =
-        new DoFn<String, String>() {
-          @ProcessElement
-          public void processElement(ProcessContext c, InputProvider<String> i) throws Exception {
-            invocations.wasProcessElementInvoked = true;
-            assertSame(c, mockContext);
-            assertSame(i, mockInputProvider);
-          }
-        };
-
-    assertFalse(
-        DoFnSignatures.INSTANCE
-            .getOrParseSignature(fn.getClass())
-            .processElement()
-            .usesSingleWindow());
-
-    checkInvokeProcessElementWorks(fn, invocations);
-  }
-
-  @Test
-  public void testDoFnWithStartBundle() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFn<String, String> fn =
-        new DoFn<String, String>() {
-          @ProcessElement
-          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
-          @StartBundle
-          public void startBundle(Context c) {
-            invocations.wasStartBundleInvoked = true;
-            assertSame(c, mockContext);
-          }
-
-          @FinishBundle
-          public void finishBundle(Context c) {
-            invocations.wasFinishBundleInvoked = true;
-            assertSame(c, mockContext);
-          }
-        };
-
-    checkInvokeStartBundleWorks(fn, invocations);
-    checkInvokeFinishBundleWorks(fn, invocations);
+    class MockFn extends DoFn<String, String> {
+      @DoFn.ProcessElement
+      public void processElement(ProcessContext c, InputProvider<String> o) throws Exception {}
+    }
+    MockFn fn = mock(MockFn.class);
+    invokeProcessElement(fn);
+    verify(fn).processElement(mockContext, mockInputProvider);
   }
 
   @Test
-  public void testDoFnWithSetupTeardown() throws Exception {
-    final Invocations invocations = new Invocations("AnonymousClass");
-    DoFn<String, String> fn =
-        new DoFn<String, String>() {
-          @ProcessElement
-          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
-          @StartBundle
-          public void startBundle(Context c) {
-            invocations.wasStartBundleInvoked = true;
-            assertSame(c, mockContext);
-          }
+  public void testDoFnWithStartBundleSetupTeardown() throws Exception {
+    class MockFn extends DoFn<String, String> {
+      @ProcessElement
+      public void processElement(ProcessContext c) {}
 
-          @FinishBundle
-          public void finishBundle(Context c) {
-            invocations.wasFinishBundleInvoked = true;
-            assertSame(c, mockContext);
-          }
+      @StartBundle
+      public void startBundle(Context c) {}
 
-          @Setup
-          public void before() {
-            invocations.wasSetupInvoked = true;
-          }
+      @FinishBundle
+      public void finishBundle(Context c) {}
 
-          @Teardown
-          public void after() {
-            invocations.wasTeardownInvoked = true;
-          }
-        };
+      @Setup
+      public void before() {}
 
-    checkInvokeSetupWorks(fn, invocations);
-    checkInvokeTeardownWorks(fn, invocations);
-  }
+      @Teardown
+      public void after() {}
+    }
+    MockFn fn = mock(MockFn.class);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    invoker.invokeSetup();
+    invoker.invokeStartBundle(mockContext);
+    invoker.invokeFinishBundle(mockContext);
+    invoker.invokeTeardown();
+    verify(fn).before();
+    verify(fn).startBundle(mockContext);
+    verify(fn).finishBundle(mockContext);
+    verify(fn).after();
+  }
+
+  // ---------------------------------------------------------------------------------------
+  // Tests for ability to invoke private, inner and anonymous classes.
+  // ---------------------------------------------------------------------------------------
 
   private static class PrivateDoFnClass extends DoFn<String, String> {
-    final Invocations invocations = new Invocations(getClass().getName());
-
     @ProcessElement
-    public void processThis(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
+    public void processThis(ProcessContext c) {}
   }
 
   @Test
   public void testLocalPrivateDoFnClass() throws Exception {
-    PrivateDoFnClass fn = new PrivateDoFnClass();
-    checkInvokeProcessElementWorks(fn, fn.invocations);
+    PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
+    invokeProcessElement(fn);
+    verify(fn).processThis(mockContext);
   }
 
   @Test
   public void testStaticPackagePrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
-    checkInvokeProcessElementWorks(
-        DoFnInvokersTestHelper.newStaticPackagePrivateDoFn(invocations), invocations);
+    DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
+    invokeProcessElement(fn);
+    DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testInnerPackagePrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
-    checkInvokeProcessElementWorks(
-        new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn(invocations), invocations);
+    DoFn<String, String> fn =
+        mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
+    invokeProcessElement(fn);
+    DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testStaticPrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticPrivateDoFn");
-    checkInvokeProcessElementWorks(
-        DoFnInvokersTestHelper.newStaticPrivateDoFn(invocations), invocations);
+    DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
+    invokeProcessElement(fn);
+    DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockContext);
   }
 
   @Test
   public void testInnerPrivateDoFnClass() throws Exception {
-    Invocations invocations = new Invocations("StaticInnerDoFn");
-    checkInvokeProcessElementWorks(
-        new DoFnInvokersTestHelper().newInnerPrivateDoFn(invocations), invocations);
+    DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
+    invokeProcessElement(fn);
+    DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockContext);
   }
 
   @Test
-  public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
-    Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
-    checkInvokeProcessElementWorks(
-        new DoFnInvokersTestHelper().newInnerAnonymousDoFn(invocations), invocations);
+  public void testAnonymousInnerDoFn() throws Exception {
+    DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
+    invokeProcessElement(fn);
+    DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockContext);
   }
 
   @Test
   public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
-    Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
-    checkInvokeProcessElementWorks(
-        DoFnInvokersTestHelper.newStaticAnonymousDoFn(invocations), invocations);
+    // Can't use mockito for this one - the anonymous class is final and can't be mocked.
+    DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn();
+    invokeProcessElement(fn);
+    DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockContext);
   }
 
+  // ---------------------------------------------------------------------------------------
+  // Tests for wrapping exceptions.
+  // ---------------------------------------------------------------------------------------
+
   @Test
   public void testProcessElementException() throws Exception {
-    DoFn<Integer, Integer> fn =
-        new DoFn<Integer, Integer>() {
-          @ProcessElement
-          public void processElement(@SuppressWarnings("unused") ProcessContext c) {
-            throw new IllegalArgumentException("bogus");
-          }
-        };
-
+    DoFnInvoker<Integer, Integer> invoker =
+        DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+            new DoFn<Integer, Integer>() {
+              @ProcessElement
+              public void processElement(@SuppressWarnings("unused") ProcessContext c) {
+                throw new IllegalArgumentException("bogus");
+              }
+            });
     thrown.expect(UserCodeException.class);
     thrown.expectMessage("bogus");
-    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeProcessElement(null, null);
+    invoker.invokeProcessElement(null, null);
   }
 
   @Test
   public void testStartBundleException() throws Exception {
-    DoFn<Integer, Integer> fn =
-        new DoFn<Integer, Integer>() {
-          @StartBundle
-          public void startBundle(@SuppressWarnings("unused") Context c) {
-            throw new IllegalArgumentException("bogus");
-          }
-
-          @ProcessElement
-          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-        };
-
+    DoFnInvoker<Integer, Integer> invoker =
+        DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+            new DoFn<Integer, Integer>() {
+              @StartBundle
+              public void startBundle(@SuppressWarnings("unused") Context c) {
+                throw new IllegalArgumentException("bogus");
+              }
+
+              @ProcessElement
+              public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+            });
     thrown.expect(UserCodeException.class);
     thrown.expectMessage("bogus");
-    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(null);
+    invoker.invokeStartBundle(null);
   }
 
   @Test
   public void testFinishBundleException() throws Exception {
-    DoFn<Integer, Integer> fn =
-        new DoFn<Integer, Integer>() {
-          @FinishBundle
-          public void finishBundle(@SuppressWarnings("unused") Context c) {
-            throw new IllegalArgumentException("bogus");
-          }
-
-          @ProcessElement
-          public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-        };
-
+    DoFnInvoker<Integer, Integer> invoker =
+        DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+            new DoFn<Integer, Integer>() {
+              @FinishBundle
+              public void finishBundle(@SuppressWarnings("unused") Context c) {
+                throw new IllegalArgumentException("bogus");
+              }
+
+              @ProcessElement
+              public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+            });
     thrown.expect(UserCodeException.class);
     thrown.expectMessage("bogus");
-    DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(null);
+    invoker.invokeFinishBundle(null);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
deleted file mode 100644
index 7bfdddc..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.reflect;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest.Invocations;
-
-/**
- * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access
- * to DoFns in other packages.
- */
-public class DoFnInvokersTestHelper {
-
-  private static class StaticPrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public StaticPrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  private class InnerPrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public InnerPrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  static class StaticPackagePrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public StaticPackagePrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  class InnerPackagePrivateDoFn extends DoFn<String, String> {
-    final Invocations invocations;
-
-    public InnerPackagePrivateDoFn(Invocations invocations) {
-      this.invocations = invocations;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      invocations.wasProcessElementInvoked = true;
-    }
-  }
-
-  public static DoFn<String, String> newStaticPackagePrivateDoFn(
-      Invocations invocations) {
-    return new StaticPackagePrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
-    return new InnerPackagePrivateDoFn(invocations);
-  }
-
-  public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
-    return new StaticPrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
-    return new InnerPrivateDoFn(invocations);
-  }
-
-  public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
-    return new DoFn<String, String>() {
-      @ProcessElement
-      public void process(ProcessContext c) {
-        invocations.wasProcessElementInvoked = true;
-      }
-    };
-  }
-
-  public static DoFn<String, String> newStaticAnonymousDoFn(
-      final Invocations invocations) {
-    return new DoFn<String, String>() {
-      @ProcessElement
-      public void process(ProcessContext c) {
-        invocations.wasProcessElementInvoked = true;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
new file mode 100644
index 0000000..c269dbd
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.analyzeProcessElementMethod;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DoFnSignatures} verification of the {@link DoFn.ProcessElement} method. */
+@SuppressWarnings("unused")
+@RunWith(JUnit4.class)
+public class DoFnSignaturesProcessElementTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testMissingProcessContext() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Must take ProcessContext<> as the first argument");
+
+    analyzeProcessElementMethod(
+        new AnonymousMethod() {
+          private void method() {}
+        });
+  }
+
+  @Test
+  public void testBadProcessContextType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Must take ProcessContext<> as the first argument");
+
+    analyzeProcessElementMethod(
+        new AnonymousMethod() {
+          private void method(String s) {}
+        });
+  }
+
+  @Test
+  public void testBadExtraProcessContextType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Integer is not a valid context parameter. "
+            + "Should be one of [BoundedWindow]");
+
+    analyzeProcessElementMethod(
+        new AnonymousMethod() {
+          private void method(DoFn<Integer, String>.ProcessContext c, Integer n) {}
+        });
+  }
+
+  @Test
+  public void testBadReturnType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Must return void");
+
+    analyzeProcessElementMethod(
+        new AnonymousMethod() {
+          private int method(DoFn<Integer, String>.ProcessContext context) {
+            return 0;
+          }
+        });
+  }
+
+  @Test
+  public void testGoodConcreteTypes() throws Exception {
+    analyzeProcessElementMethod(
+        new AnonymousMethod() {
+          private void method(
+              DoFn<Integer, String>.ProcessContext c,
+              DoFn.InputProvider<Integer> input,
+              DoFn.OutputReceiver<String> output) {}
+        });
+  }
+
+  @Test
+  public void testBadGenericsTwoArgs() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter: "
+            + "OutputReceiver<Integer>, should be OutputReceiver<String>");
+
+    analyzeProcessElementMethod(
+        new AnonymousMethod() {
+          private void method(
+              DoFn<Integer, String>.ProcessContext c,
+              DoFn.InputProvider<Integer> input,
+              DoFn.OutputReceiver<Integer> output) {}
+        });
+  }
+
+  @Test
+  public void testBadGenericWildCards() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter: "
+            + "OutputReceiver<? super Integer>, should be OutputReceiver<String>");
+
+    analyzeProcessElementMethod(
+        new AnonymousMethod() {
+          private void method(
+              DoFn<Integer, String>.ProcessContext c,
+              DoFn.InputProvider<Integer> input,
+              DoFn.OutputReceiver<? super Integer> output) {}
+        });
+  }
+
+  static class BadTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> {
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void badTypeVariables(
+        DoFn<InputT, OutputT>.ProcessContext c,
+        DoFn.InputProvider<InputT> input,
+        DoFn.OutputReceiver<InputT> output) {}
+  }
+
+  @Test
+  public void testBadTypeVariables() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter: "
+            + "OutputReceiver<InputT>, should be OutputReceiver<OutputT>");
+
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadTypeVariables.class);
+  }
+
+  @Test
+  public void testNoProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No method annotated with @ProcessElement found");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(new DoFn<String, String>() {}.getClass());
+  }
+
+  @Test
+  public void testMultipleProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
+    thrown.expectMessage("foo()");
+    thrown.expectMessage("bar()");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void foo() {}
+
+          @ProcessElement
+          public void bar() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testPrivateProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("process()");
+    thrown.expectMessage("Must be public");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          private void process() {}
+        }.getClass());
+  }
+
+  private static class GoodTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> {
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void goodTypeVariables(
+        DoFn<InputT, OutputT>.ProcessContext c,
+        DoFn.InputProvider<InputT> input,
+        DoFn.OutputReceiver<OutputT> output) {}
+  }
+
+  @Test
+  public void testGoodTypeVariables() throws Exception {
+    DoFnSignatures.INSTANCE.getOrParseSignature(GoodTypeVariables.class);
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void processElement(ProcessContext c, InputProvider<T> input, OutputReceiver<T> output) {
+      c.output(c.element());
+    }
+  }
+
+  private static class IdentityListFn<T> extends IdentityFn<List<T>> {}
+
+  @Test
+  public void testIdentityFnApplied() throws Exception {
+    DoFnSignatures.INSTANCE.getOrParseSignature(new IdentityFn<String>() {}.getClass());
+  }
+}