You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/09/12 17:58:34 UTC
[2/3] incubator-beam git commit: More changes to DoFn{Signatures,
Invokers}
More changes to DoFn{Signatures,Invokers}
In preparation for Splittable DoFn:
* More generic code generation in DoFnInvokers:
supports methods with return values (thanks @bjchambers).
* Uses AutoValue builder in DoFnSignature.
* Contextual error reporting in DoFnSignatures parsing code.
* Rewrote DoFnInvokers tests to use Mockito.
* Changed DoFnSignatures tests to use local classes
and an "AnonymousMethod" class for testing analysis of
single methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75c8bb8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75c8bb8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75c8bb8d
Branch: refs/heads/master
Commit: 75c8bb8dd3127b4dce63e8359ae27185f246b28c
Parents: 05c6c27
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 11 17:13:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Sep 12 10:08:57 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/DoFnAdapters.java | 2 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 20 +-
.../sdk/transforms/reflect/DoFnInvokers.java | 408 +++++++++--------
.../sdk/transforms/reflect/DoFnSignature.java | 71 ++-
.../sdk/transforms/reflect/DoFnSignatures.java | 245 +++++-----
.../beam/sdk/util/common/ReflectHelpers.java | 22 -
.../transforms/reflect/DoFnInvokersTest.java | 455 ++++++-------------
.../reflect/DoFnInvokersTestHelper.java | 116 -----
.../DoFnSignaturesProcessElementTest.java | 213 +++++++++
.../transforms/reflect/DoFnSignaturesTest.java | 269 +----------
.../reflect/DoFnSignaturesTestUtils.java | 64 +++
.../testhelper/DoFnInvokersTestHelper.java | 124 +++++
12 files changed, 968 insertions(+), 1041 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 4803d77..77a71e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -86,7 +86,7 @@ public class DoFnAdapters {
@Override
public void startBundle(Context c) throws Exception {
- this.fn.prepareForProcessing();
+ fn.prepareForProcessing();
invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 5818a59..9de6759 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -26,28 +26,16 @@ import org.apache.beam.sdk.transforms.DoFn;
* referred to as the bound {@link DoFn}.
*/
public interface DoFnInvoker<InputT, OutputT> {
- /**
- * Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}.
- */
+ /** Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}. */
void invokeSetup();
- /**
- * Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}.
- *
- * @param c The {@link DoFn.Context} to invoke the fn with.
- */
+ /** Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}. */
void invokeStartBundle(DoFn<InputT, OutputT>.Context c);
- /**
- * Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}.
- *
- * @param c The {@link DoFn.Context} to invoke the fn with.
- */
+ /** Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}. */
void invokeFinishBundle(DoFn<InputT, OutputT>.Context c);
- /**
- * Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}.
- */
+ /** Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}. */
void invokeTeardown();
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 68e2ca9..f622015 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -18,10 +18,10 @@
package org.apache.beam.sdk.transforms.reflect;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
@@ -39,12 +39,15 @@ import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.dynamic.scaffold.InstrumentedType;
import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.FixedValue;
import net.bytebuddy.implementation.Implementation;
-import net.bytebuddy.implementation.MethodCall;
-import net.bytebuddy.implementation.bind.MethodDelegationBinder;
+import net.bytebuddy.implementation.Implementation.Context;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder;
import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
import net.bytebuddy.implementation.bytecode.StackManipulation;
import net.bytebuddy.implementation.bytecode.Throw;
+import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.implementation.bytecode.member.FieldAccess;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import net.bytebuddy.implementation.bytecode.member.MethodReturn;
@@ -52,13 +55,10 @@ import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
import net.bytebuddy.jar.asm.Label;
import net.bytebuddy.jar.asm.MethodVisitor;
import net.bytebuddy.jar.asm.Opcodes;
+import net.bytebuddy.jar.asm.Type;
import net.bytebuddy.matcher.ElementMatchers;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.StartBundle;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
import org.apache.beam.sdk.util.UserCodeException;
/** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
@@ -153,25 +153,13 @@ public class DoFnInvokers {
.method(ElementMatchers.named("invokeProcessElement"))
.intercept(new ProcessElementDelegation(signature.processElement()))
.method(ElementMatchers.named("invokeStartBundle"))
- .intercept(
- signature.startBundle() == null
- ? new NoopMethodImplementation()
- : new BundleMethodDelegation(signature.startBundle()))
+ .intercept(delegateOrNoop(signature.startBundle()))
.method(ElementMatchers.named("invokeFinishBundle"))
- .intercept(
- signature.finishBundle() == null
- ? new NoopMethodImplementation()
- : new BundleMethodDelegation(signature.finishBundle()))
+ .intercept(delegateOrNoop(signature.finishBundle()))
.method(ElementMatchers.named("invokeSetup"))
- .intercept(
- signature.setup() == null
- ? new NoopMethodImplementation()
- : new LifecycleMethodDelegation(signature.setup()))
+ .intercept(delegateOrNoop(signature.setup()))
.method(ElementMatchers.named("invokeTeardown"))
- .intercept(
- signature.teardown() == null
- ? new NoopMethodImplementation()
- : new LifecycleMethodDelegation(signature.teardown()));
+ .intercept(delegateOrNoop(signature.teardown()));
DynamicType.Unloaded<?> unloaded = builder.make();
@@ -184,35 +172,29 @@ public class DoFnInvokers {
return res;
}
- /** Implements an invoker method by doing nothing and immediately returning void. */
- private static class NoopMethodImplementation implements Implementation {
- @Override
- public InstrumentedType prepare(InstrumentedType instrumentedType) {
- return instrumentedType;
- }
-
- @Override
- public ByteCodeAppender appender(final Target implementationTarget) {
- return new ByteCodeAppender() {
- @Override
- public Size apply(
- MethodVisitor methodVisitor,
- Context implementationContext,
- MethodDescription instrumentedMethod) {
- StackManipulation manipulation = MethodReturn.VOID;
- StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
- return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
- }
- };
- }
+ /** Delegates to the given method if available, or does nothing. */
+ private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) {
+ return (method == null)
+ ? FixedValue.originType()
+ : new DoFnMethodDelegation(method.targetMethod());
}
/**
- * Base class for implementing an invoker method by delegating to a method of the target {@link
- * DoFn}.
+ * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a
+ * "target method" of the wrapped {@link DoFn}.
*/
- private abstract static class MethodDelegation implements Implementation {
- FieldDescription delegateField;
+ private static class DoFnMethodDelegation implements Implementation {
+ /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */
+ protected final MethodDescription targetMethod;
+ /** Whether the target method returns non-void. */
+ private final boolean targetHasReturn;
+
+ private FieldDescription delegateField;
+
+ public DoFnMethodDelegation(Method targetMethod) {
+ this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod);
+ targetHasReturn = !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure());
+ }
@Override
public InstrumentedType prepare(InstrumentedType instrumentedType) {
@@ -222,7 +204,6 @@ public class DoFnInvokers {
.getDeclaredFields()
.filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
.getOnly();
-
// Delegating the method call doesn't require any changes to the instrumented type.
return instrumentedType;
}
@@ -230,54 +211,102 @@ public class DoFnInvokers {
@Override
public ByteCodeAppender appender(final Target implementationTarget) {
return new ByteCodeAppender() {
+ /**
+ * @param instrumentedMethod The {@link DoFnInvoker} method for which we're generating code.
+ */
@Override
public Size apply(
MethodVisitor methodVisitor,
Context implementationContext,
MethodDescription instrumentedMethod) {
+ // Figure out how many locals we'll need. This corresponds to "this", the parameters
+ // of the instrumented method, and an argument to hold the return value if the target
+ // method has a return value.
+ int numLocals = 1 + instrumentedMethod.getParameters().size() + (targetHasReturn ? 1 : 0);
+
+ Integer returnVarIndex = null;
+ if (targetHasReturn) {
+ // Local comes after formal parameters, so figure out where that is.
+ returnVarIndex = 1; // "this"
+ for (Type param : Type.getArgumentTypes(instrumentedMethod.getDescriptor())) {
+ returnVarIndex += param.getSize();
+ }
+ }
+
StackManipulation manipulation =
new StackManipulation.Compound(
- // Push "this" reference to the stack
+ // Push "this" (DoFnInvoker on top of the stack)
MethodVariableAccess.REFERENCE.loadOffset(0),
- // Access the delegate field of the the invoker
+ // Access this.delegate (DoFn on top of the stack)
FieldAccess.forField(delegateField).getter(),
- invokeTargetMethod(instrumentedMethod));
+ // Run the beforeDelegation manipulations.
+ // The arguments necessary to invoke the target are on top of the stack.
+ beforeDelegation(instrumentedMethod),
+ // Perform the method delegation.
+ // This will consume the arguments on top of the stack
+ // Either the stack is now empty (because the targetMethod returns void) or the
+ // stack contains the return value.
+ new UserCodeMethodInvocation(returnVarIndex, targetMethod, instrumentedMethod),
+ // Run the afterDelegation manipulations.
+ // Either the stack is now empty (because the instrumentedMethod returns void)
+ // or the stack contains the return value.
+ afterDelegation(instrumentedMethod));
+
StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
- return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ return new Size(size.getMaximalSize(), numLocals);
}
};
}
/**
- * Generates code to invoke the target method. When this is called the delegate field will be on
- * top of the stack. This should add any necessary arguments to the stack and then perform the
- * method invocation.
+ * Return the code to the prepare the operand stack for the method delegation.
+ *
+ * <p>Before this method is called, the stack delegate will be the only thing on the stack.
+ *
+ * <p>After this method is called, the stack contents should contain exactly the arguments
+ * necessary to invoke the target method.
*/
- protected abstract StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod);
+ protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+ return MethodVariableAccess.allArgumentsOf(targetMethod);
+ }
+
+ /**
+ * Return the code to execute after the method delegation.
+ *
+ * <p>Before this method is called, the stack will either be empty (if the target method returns
+ * void) or contain the method return value.
+ *
+ * <p>After this method is called, the stack should either be empty (if the instrumented method
+ * returns void) or contain the value for the instrumented method to return).
+ */
+ protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
+ return TargetMethodAnnotationDrivenBinder.TerminationHandler.Returning.INSTANCE.resolve(
+ Assigner.DEFAULT, instrumentedMethod, targetMethod);
+ }
}
/**
* Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
* {@link DoFn.ProcessElement} method.
*/
- private static final class ProcessElementDelegation extends MethodDelegation {
- private static final Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription>
+ private static final class ProcessElementDelegation extends DoFnMethodDelegation {
+ private static final Map<DoFnSignature.Parameter, MethodDescription>
EXTRA_CONTEXT_FACTORY_METHODS;
static {
try {
- Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription> methods =
- new EnumMap<>(DoFnSignature.ProcessElementMethod.Parameter.class);
+ Map<DoFnSignature.Parameter, MethodDescription> methods =
+ new EnumMap<>(DoFnSignature.Parameter.class);
methods.put(
- DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW,
+ DoFnSignature.Parameter.BOUNDED_WINDOW,
new MethodDescription.ForLoadedMethod(
DoFn.ExtraContextFactory.class.getMethod("window")));
methods.put(
- DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER,
+ DoFnSignature.Parameter.INPUT_PROVIDER,
new MethodDescription.ForLoadedMethod(
DoFn.ExtraContextFactory.class.getMethod("inputProvider")));
methods.put(
- DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER,
+ DoFnSignature.Parameter.OUTPUT_RECEIVER,
new MethodDescription.ForLoadedMethod(
DoFn.ExtraContextFactory.class.getMethod("outputReceiver")));
EXTRA_CONTEXT_FACTORY_METHODS = Collections.unmodifiableMap(methods);
@@ -291,16 +320,12 @@ public class DoFnInvokers {
/** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) {
+ super(signature.targetMethod());
this.signature = signature;
}
@Override
- protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
- MethodDescription targetMethod =
- new MethodCall.MethodLocator.ForExplicitMethod(
- new MethodDescription.ForLoadedMethod(signature.targetMethod()))
- .resolve(instrumentedMethod);
-
+ protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
// Parameters of the wrapper invoker method:
// DoFn.ProcessContext, ExtraContextFactory.
// Parameters of the wrapped DoFn method:
@@ -310,144 +335,159 @@ public class DoFnInvokers {
parameters.add(MethodVariableAccess.REFERENCE.loadOffset(1));
// Push the extra arguments in their actual order.
StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(2);
- for (DoFnSignature.ProcessElementMethod.Parameter param : signature.extraParameters()) {
+ for (DoFnSignature.Parameter param : signature.extraParameters()) {
parameters.add(
new StackManipulation.Compound(
pushExtraContextFactory,
MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param))));
}
+ return new StackManipulation.Compound(parameters);
+ }
- return new StackManipulation.Compound(
- // Push the parameters
- new StackManipulation.Compound(parameters),
- // Invoke the target method
- wrapWithUserCodeException(
- MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
- // Return from the instrumented method
- MethodReturn.VOID);
+ @Override
+ protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
+ return MethodReturn.VOID;
}
}
- /**
- * Implements {@link DoFnInvoker#invokeStartBundle} or {@link DoFnInvoker#invokeFinishBundle} by
- * delegating respectively to the {@link StartBundle} and {@link FinishBundle} methods.
- */
- private static final class BundleMethodDelegation extends MethodDelegation {
- private final DoFnSignature.BundleMethod signature;
+ private static class UserCodeMethodInvocation implements StackManipulation {
- private BundleMethodDelegation(@Nullable DoFnSignature.BundleMethod signature) {
- this.signature = signature;
+ @Nullable private final Integer returnVarIndex;
+ private final MethodDescription targetMethod;
+ private final MethodDescription instrumentedMethod;
+ private final TypeDescription returnType;
+
+ private final Label wrapStart = new Label();
+ private final Label wrapEnd = new Label();
+ private final Label tryBlockStart = new Label();
+ private final Label tryBlockEnd = new Label();
+ private final Label catchBlockStart = new Label();
+ private final Label catchBlockEnd = new Label();
+
+ private final MethodDescription createUserCodeException;
+
+ UserCodeMethodInvocation(
+ @Nullable Integer returnVarIndex,
+ MethodDescription targetMethod,
+ MethodDescription instrumentedMethod) {
+ this.returnVarIndex = returnVarIndex;
+ this.targetMethod = targetMethod;
+ this.instrumentedMethod = instrumentedMethod;
+ this.returnType = targetMethod.getReturnType().asErasure();
+
+ boolean targetMethodReturnsVoid = TypeDescription.VOID.equals(returnType);
+ checkArgument(
+ (returnVarIndex == null) == targetMethodReturnsVoid,
+ "returnVarIndex should be defined if and only if the target method has a return value");
+
+ try {
+ createUserCodeException =
+ new MethodDescription.ForLoadedMethod(
+ UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException("Unable to find UserCodeException.wrap", e);
+ }
}
@Override
- protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
- MethodDescription targetMethod =
- new MethodCall.MethodLocator.ForExplicitMethod(
- new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
- .resolve(instrumentedMethod);
- return new StackManipulation.Compound(
- // Push the parameters
- MethodVariableAccess.REFERENCE.loadOffset(1),
- // Invoke the target method
- wrapWithUserCodeException(
- MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
- MethodReturn.VOID);
+ public boolean isValid() {
+ return true;
}
- }
- /**
- * Implements {@link DoFnInvoker#invokeSetup} or {@link DoFnInvoker#invokeTeardown} by delegating
- * respectively to the {@link Setup} and {@link Teardown} methods.
- */
- private static final class LifecycleMethodDelegation extends MethodDelegation {
- private final DoFnSignature.LifecycleMethod signature;
+ private Object describeType(Type type) {
+ switch (type.getSort()) {
+ case Type.OBJECT:
+ return type.getDescriptor();
+ case Type.INT:
+ case Type.BYTE:
+ case Type.BOOLEAN:
+ case Type.SHORT:
+ return Opcodes.INTEGER;
+ case Type.LONG:
+ return Opcodes.LONG;
+ case Type.DOUBLE:
+ return Opcodes.DOUBLE;
+ case Type.FLOAT:
+ return Opcodes.FLOAT;
+ default:
+ throw new IllegalArgumentException("Unhandled type as method argument: " + type);
+ }
+ }
- private LifecycleMethodDelegation(@Nullable DoFnSignature.LifecycleMethod signature) {
- this.signature = signature;
+ private void visitFrame(
+ MethodVisitor mv, boolean localsIncludeReturn, @Nullable String stackTop) {
+ boolean hasReturnLocal = (returnVarIndex != null) && localsIncludeReturn;
+
+ Type[] localTypes = Type.getArgumentTypes(instrumentedMethod.getDescriptor());
+ Object[] locals = new Object[1 + localTypes.length + (hasReturnLocal ? 1 : 0)];
+ locals[0] = instrumentedMethod.getReceiverType().asErasure().getInternalName();
+ for (int i = 0; i < localTypes.length; i++) {
+ locals[i + 1] = describeType(localTypes[i]);
+ }
+ if (hasReturnLocal) {
+ locals[locals.length - 1] = returnType.getInternalName();
+ }
+
+ Object[] stack = stackTop == null ? new Object[] {} : new Object[] {stackTop};
+
+ mv.visitFrame(Opcodes.F_NEW, locals.length, locals, stack.length, stack);
}
@Override
- protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
- MethodDescription targetMethod =
- new MethodCall.MethodLocator.ForExplicitMethod(
- new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
- .resolve(instrumentedMethod);
- return new StackManipulation.Compound(
- wrapWithUserCodeException(
- MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
- MethodReturn.VOID);
- }
- }
+ public Size apply(MethodVisitor mv, Context context) {
+ Size size = new Size(0, 0);
- /**
- * Wraps a given stack manipulation in a try catch block. Any exceptions thrown within the try are
- * wrapped with a {@link UserCodeException}.
- */
- private static StackManipulation wrapWithUserCodeException(final StackManipulation tryBody) {
- final MethodDescription createUserCodeException;
- try {
- createUserCodeException =
- new MethodDescription.ForLoadedMethod(
- UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
- } catch (NoSuchMethodException | SecurityException e) {
- throw new RuntimeException("Unable to find UserCodeException.wrap", e);
- }
+ mv.visitLabel(wrapStart);
+
+ String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
+ mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
- return new StackManipulation() {
- @Override
- public boolean isValid() {
- return tryBody.isValid();
+ // The try block attempts to perform the expected operations, then jumps to success
+ mv.visitLabel(tryBlockStart);
+ size = size.aggregate(MethodInvocation.invoke(targetMethod).apply(mv, context));
+
+ if (returnVarIndex != null) {
+ mv.visitVarInsn(Opcodes.ASTORE, returnVarIndex);
+ size = size.aggregate(new Size(-1, 0)); // Reduces the size of the stack
}
+ mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
+ mv.visitLabel(tryBlockEnd);
+
+ // The handler wraps the exception, and then throws.
+ mv.visitLabel(catchBlockStart);
+ // In catch block, should have same locals and {Throwable} on the stack.
+ visitFrame(mv, false, throwableName);
+
+ // Create the user code exception and throw
+ size =
+ size.aggregate(
+ new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
+ .apply(mv, context));
+
+ mv.visitLabel(catchBlockEnd);
- @Override
- public Size apply(MethodVisitor mv, Implementation.Context implementationContext) {
- Label tryBlockStart = new Label();
- Label tryBlockEnd = new Label();
- Label catchBlockStart = new Label();
- Label catchBlockEnd = new Label();
-
- String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
- mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
-
- // The try block attempts to perform the expected operations, then jumps to success
- mv.visitLabel(tryBlockStart);
- Size trySize = tryBody.apply(mv, implementationContext);
- mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
- mv.visitLabel(tryBlockEnd);
-
- // The handler wraps the exception, and then throws.
- mv.visitLabel(catchBlockStart);
- // Add the exception to the frame
- mv.visitFrame(
- Opcodes.F_SAME1,
- // No local variables
- 0,
- new Object[] {},
- // 1 stack element (the throwable)
- 1,
- new Object[] {throwableName});
-
- Size catchSize =
- new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
- .apply(mv, implementationContext);
-
- mv.visitLabel(catchBlockEnd);
- // The frame contents after the try/catch block is the same
- // as it was before.
- mv.visitFrame(
- Opcodes.F_SAME,
- // No local variables
- 0,
- new Object[] {},
- // No new stack variables
- 0,
- new Object[] {});
-
- return new Size(
- trySize.getSizeImpact(),
- Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize()));
+ // After the catch block we should have the return in scope, but nothing on the stack.
+ visitFrame(mv, true, null);
+
+ // After catch block, should have same locals and will have the return on the stack.
+ if (returnVarIndex != null) {
+ mv.visitVarInsn(Opcodes.ALOAD, returnVarIndex);
+ size = size.aggregate(new Size(1, 0)); // Increases the size of the stack
+ }
+ mv.visitLabel(wrapEnd);
+ if (returnVarIndex != null) {
+ // Drop the return type from the locals
+ mv.visitLocalVariable(
+ "res",
+ returnType.getDescriptor(),
+ returnType.getGenericSignature(),
+ wrapStart,
+ wrapEnd,
+ returnVarIndex);
}
- };
+
+ return size;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 181c088..b6864da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -32,57 +32,74 @@ import org.apache.beam.sdk.transforms.DoFn;
*/
@AutoValue
public abstract class DoFnSignature {
+ /** Class of the original {@link DoFn} from which this signature was produced. */
public abstract Class<? extends DoFn> fnClass();
+ /** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */
public abstract ProcessElementMethod processElement();
+ /** Details about this {@link DoFn}'s {@link DoFn.StartBundle} method. */
@Nullable
public abstract BundleMethod startBundle();
+ /** Details about this {@link DoFn}'s {@link DoFn.FinishBundle} method. */
@Nullable
public abstract BundleMethod finishBundle();
+ /** Details about this {@link DoFn}'s {@link DoFn.Setup} method. */
@Nullable
public abstract LifecycleMethod setup();
+ /** Details about this {@link DoFn}'s {@link DoFn.Teardown} method. */
@Nullable
public abstract LifecycleMethod teardown();
- static DoFnSignature create(
- Class<? extends DoFn> fnClass,
- ProcessElementMethod processElement,
- @Nullable BundleMethod startBundle,
- @Nullable BundleMethod finishBundle,
- @Nullable LifecycleMethod setup,
- @Nullable LifecycleMethod teardown) {
- return new AutoValue_DoFnSignature(
- fnClass,
- processElement,
- startBundle,
- finishBundle,
- setup,
- teardown);
+ static Builder builder() {
+ return new AutoValue_DoFnSignature.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setFnClass(Class<? extends DoFn> fnClass);
+ abstract Builder setProcessElement(ProcessElementMethod processElement);
+ abstract Builder setStartBundle(BundleMethod startBundle);
+ abstract Builder setFinishBundle(BundleMethod finishBundle);
+ abstract Builder setSetup(LifecycleMethod setup);
+ abstract Builder setTeardown(LifecycleMethod teardown);
+ abstract DoFnSignature build();
+ }
+
+ /** A method delegated to a annotated method of an underlying {@link DoFn}. */
+ public interface DoFnMethod {
+ /** The annotated method itself. */
+ Method targetMethod();
+ }
+
+ /** A type of optional parameter of the {@link DoFn.ProcessElement} method. */
+ public enum Parameter {
+ BOUNDED_WINDOW,
+ INPUT_PROVIDER,
+ OUTPUT_RECEIVER,
}
/** Describes a {@link DoFn.ProcessElement} method. */
@AutoValue
- public abstract static class ProcessElementMethod {
- enum Parameter {
- BOUNDED_WINDOW,
- INPUT_PROVIDER,
- OUTPUT_RECEIVER
- }
-
+ public abstract static class ProcessElementMethod implements DoFnMethod {
+ /** The annotated method itself. */
+ @Override
public abstract Method targetMethod();
+ /** Types of optional parameters of the annotated method, in the order they appear. */
public abstract List<Parameter> extraParameters();
- static ProcessElementMethod create(Method targetMethod, List<Parameter> extraParameters) {
+ static ProcessElementMethod create(
+ Method targetMethod,
+ List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_ProcessElementMethod(
targetMethod, Collections.unmodifiableList(extraParameters));
}
- /** @return true if the reflected {@link DoFn} uses a Single Window. */
+ /** Whether this {@link DoFn} uses a Single Window. */
public boolean usesSingleWindow() {
return extraParameters().contains(Parameter.BOUNDED_WINDOW);
}
@@ -90,7 +107,9 @@ public abstract class DoFnSignature {
/** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
@AutoValue
- public abstract static class BundleMethod {
+ public abstract static class BundleMethod implements DoFnMethod {
+ /** The annotated method itself. */
+ @Override
public abstract Method targetMethod();
static BundleMethod create(Method targetMethod) {
@@ -100,7 +119,9 @@ public abstract class DoFnSignature {
/** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */
@AutoValue
- public abstract static class LifecycleMethod {
+ public abstract static class LifecycleMethod implements DoFnMethod {
+ /** The annotated method itself. */
+ @Override
public abstract Method targetMethod();
static LifecycleMethod create(Method targetMethod) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 7e482d5..8283788 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.sdk.transforms.reflect;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.reflect.TypeParameter;
import com.google.common.reflect.TypeToken;
@@ -36,6 +33,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.common.ReflectHelpers;
@@ -62,15 +60,17 @@ public class DoFnSignatures {
/** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */
private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) {
- TypeToken<?> inputT = null;
- TypeToken<?> outputT = null;
+ DoFnSignature.Builder builder = DoFnSignature.builder();
+
+ ErrorReporter errors = new ErrorReporter(null, fnClass.getName());
+ errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn");
+ builder.setFnClass(fnClass);
- // Extract the input and output type.
- checkArgument(
- DoFn.class.isAssignableFrom(fnClass),
- "%s must be subtype of DoFn",
- fnClass.getSimpleName());
TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass);
+
+ // Extract the input and output type, and whether the fn is bounded.
+ TypeToken<?> inputT = null;
+ TypeToken<?> outputT = null;
for (TypeToken<?> supertype : fnToken.getTypes()) {
if (!supertype.getRawType().equals(DoFn.class)) {
continue;
@@ -79,25 +79,48 @@ public class DoFnSignatures {
inputT = TypeToken.of(args[0]);
outputT = TypeToken.of(args[1]);
}
- checkNotNull(inputT, "Unable to determine input type from %s", fnClass);
-
- Method processElementMethod = findAnnotatedMethod(DoFn.ProcessElement.class, fnClass, true);
- Method startBundleMethod = findAnnotatedMethod(DoFn.StartBundle.class, fnClass, false);
- Method finishBundleMethod = findAnnotatedMethod(DoFn.FinishBundle.class, fnClass, false);
- Method setupMethod = findAnnotatedMethod(DoFn.Setup.class, fnClass, false);
- Method teardownMethod = findAnnotatedMethod(DoFn.Teardown.class, fnClass, false);
-
- return DoFnSignature.create(
- fnClass,
- analyzeProcessElementMethod(fnToken, processElementMethod, inputT, outputT),
- (startBundleMethod == null)
- ? null
- : analyzeBundleMethod(fnToken, startBundleMethod, inputT, outputT),
- (finishBundleMethod == null)
- ? null
- : analyzeBundleMethod(fnToken, finishBundleMethod, inputT, outputT),
- (setupMethod == null) ? null : analyzeLifecycleMethod(setupMethod),
- (teardownMethod == null) ? null : analyzeLifecycleMethod(teardownMethod));
+ errors.checkNotNull(inputT, "Unable to determine input type");
+
+ Method processElementMethod =
+ findAnnotatedMethod(errors, DoFn.ProcessElement.class, fnClass, true);
+ Method startBundleMethod = findAnnotatedMethod(errors, DoFn.StartBundle.class, fnClass, false);
+ Method finishBundleMethod =
+ findAnnotatedMethod(errors, DoFn.FinishBundle.class, fnClass, false);
+ Method setupMethod = findAnnotatedMethod(errors, DoFn.Setup.class, fnClass, false);
+ Method teardownMethod = findAnnotatedMethod(errors, DoFn.Teardown.class, fnClass, false);
+
+ ErrorReporter processElementErrors =
+ errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
+ DoFnSignature.ProcessElementMethod processElement =
+ analyzeProcessElementMethod(
+ processElementErrors, fnToken, processElementMethod, inputT, outputT);
+ builder.setProcessElement(processElement);
+
+ if (startBundleMethod != null) {
+ ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod);
+ builder.setStartBundle(
+ analyzeBundleMethod(startBundleErrors, fnToken, startBundleMethod, inputT, outputT));
+ }
+
+ if (finishBundleMethod != null) {
+ ErrorReporter finishBundleErrors =
+ errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod);
+ builder.setFinishBundle(
+ analyzeBundleMethod(finishBundleErrors, fnToken, finishBundleMethod, inputT, outputT));
+ }
+
+ if (setupMethod != null) {
+ builder.setSetup(
+ analyzeLifecycleMethod(errors.forMethod(DoFn.Setup.class, setupMethod), setupMethod));
+ }
+
+ if (teardownMethod != null) {
+ builder.setTeardown(
+ analyzeLifecycleMethod(
+ errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
+ }
+
+ return builder.build();
}
/**
@@ -139,10 +162,12 @@ public class DoFnSignatures {
@VisibleForTesting
static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
- TypeToken<? extends DoFn> fnClass, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
- checkArgument(
- void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
- checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+ ErrorReporter errors,
+ TypeToken<? extends DoFn> fnClass,
+ Method m,
+ TypeToken<?> inputT,
+ TypeToken<?> outputT) {
+ errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
@@ -151,57 +176,49 @@ public class DoFnSignatures {
if (params.length > 0) {
contextToken = fnClass.resolveType(params[0]);
}
- checkArgument(
+ errors.checkArgument(
contextToken != null && contextToken.equals(processContextToken),
- "%s must take a %s as its first argument",
- format(m),
+ "Must take %s as the first argument",
formatType(processContextToken));
- List<DoFnSignature.ProcessElementMethod.Parameter> extraParameters = new ArrayList<>();
+ List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
+
TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
for (int i = 1; i < params.length; ++i) {
- TypeToken<?> param = fnClass.resolveType(params[i]);
- Class<?> rawType = param.getRawType();
+ TypeToken<?> paramT = fnClass.resolveType(params[i]);
+ Class<?> rawType = paramT.getRawType();
if (rawType.equals(BoundedWindow.class)) {
- checkArgument(
- !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW),
- "Multiple BoundedWindow parameters in %s",
- format(m));
- extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW);
+ errors.checkArgument(
+ !extraParameters.contains(DoFnSignature.Parameter.BOUNDED_WINDOW),
+ "Multiple BoundedWindow parameters");
+ extraParameters.add(DoFnSignature.Parameter.BOUNDED_WINDOW);
} else if (rawType.equals(DoFn.InputProvider.class)) {
- checkArgument(
- !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER),
- "Multiple InputProvider parameters in %s",
- format(m));
- checkArgument(
- param.equals(expectedInputProviderT),
- "Wrong type of InputProvider parameter for method %s: %s, should be %s",
- format(m),
- formatType(param),
+ errors.checkArgument(
+ !extraParameters.contains(DoFnSignature.Parameter.INPUT_PROVIDER),
+ "Multiple InputProvider parameters");
+ errors.checkArgument(
+ paramT.equals(expectedInputProviderT),
+ "Wrong type of InputProvider parameter: %s, should be %s",
+ formatType(paramT),
formatType(expectedInputProviderT));
- extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER);
+ extraParameters.add(DoFnSignature.Parameter.INPUT_PROVIDER);
} else if (rawType.equals(DoFn.OutputReceiver.class)) {
- checkArgument(
- !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER),
- "Multiple OutputReceiver parameters in %s",
- format(m));
- checkArgument(
- param.equals(expectedOutputReceiverT),
- "Wrong type of OutputReceiver parameter for method %s: %s, should be %s",
- format(m),
- formatType(param),
+ errors.checkArgument(
+ !extraParameters.contains(DoFnSignature.Parameter.OUTPUT_RECEIVER),
+ "Multiple OutputReceiver parameters");
+ errors.checkArgument(
+ paramT.equals(expectedOutputReceiverT),
+ "Wrong type of OutputReceiver parameter: %s, should be %s",
+ formatType(paramT),
formatType(expectedOutputReceiverT));
- extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER);
+ extraParameters.add(DoFnSignature.Parameter.OUTPUT_RECEIVER);
} else {
List<String> allowedParamTypes =
Arrays.asList(formatType(new TypeToken<BoundedWindow>() {}));
- checkArgument(
- false,
- "%s is not a valid context parameter for method %s. Should be one of %s",
- formatType(param),
- format(m),
- allowedParamTypes);
+ errors.throwIllegalArgument(
+ "%s is not a valid context parameter. Should be one of %s",
+ formatType(paramT), allowedParamTypes);
}
}
@@ -210,35 +227,25 @@ public class DoFnSignatures {
@VisibleForTesting
static DoFnSignature.BundleMethod analyzeBundleMethod(
- TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
- checkArgument(
- void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
- checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
-
+ ErrorReporter errors,
+ TypeToken<? extends DoFn> fnToken,
+ Method m,
+ TypeToken<?> inputT,
+ TypeToken<?> outputT) {
+ errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT);
-
Type[] params = m.getGenericParameterTypes();
- checkArgument(
- params.length == 1,
- "%s must have a single argument of type %s",
- format(m),
+ errors.checkArgument(
+ params.length == 1 && fnToken.resolveType(params[0]).equals(expectedContextToken),
+ "Must take a single argument of type %s",
formatType(expectedContextToken));
- TypeToken<?> contextToken = fnToken.resolveType(params[0]);
- checkArgument(
- contextToken.equals(expectedContextToken),
- "Wrong type of context argument to %s: %s, must be %s",
- format(m),
- formatType(contextToken),
- formatType(expectedContextToken));
-
return DoFnSignature.BundleMethod.create(m);
}
- private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(Method m) {
- checkArgument(
- void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
- checkArgument(
- m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
+ private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(
+ ErrorReporter errors, Method m) {
+ errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
+ errors.checkArgument(m.getGenericParameterTypes().length == 0, "Must take zero arguments");
return DoFnSignature.LifecycleMethod.create(m);
}
@@ -272,15 +279,11 @@ public class DoFnSignatures {
}
private static Method findAnnotatedMethod(
- Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
+ ErrorReporter errors, Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class);
if (matches.size() == 0) {
- checkArgument(
- !required,
- "No method annotated with @%s found in %s",
- anno.getSimpleName(),
- fnClazz.getName());
+ errors.checkArgument(!required, "No method annotated with @%s found", anno.getSimpleName());
return null;
}
@@ -289,7 +292,7 @@ public class DoFnSignatures {
// classes).
Method first = matches.iterator().next();
for (Method other : matches) {
- checkArgument(
+ errors.checkArgument(
first.getName().equals(other.getName())
&& Arrays.equals(first.getParameterTypes(), other.getParameterTypes()),
"Found multiple methods annotated with @%s. [%s] and [%s]",
@@ -298,22 +301,50 @@ public class DoFnSignatures {
format(other));
}
+ ErrorReporter methodErrors = errors.forMethod(anno, first);
// We need to be able to call it. We require it is public.
- checkArgument(
- (first.getModifiers() & Modifier.PUBLIC) != 0, "%s must be public", format(first));
-
+ methodErrors.checkArgument((first.getModifiers() & Modifier.PUBLIC) != 0, "Must be public");
// And make sure its not static.
- checkArgument(
- (first.getModifiers() & Modifier.STATIC) == 0, "%s must not be static", format(first));
+ methodErrors.checkArgument((first.getModifiers() & Modifier.STATIC) == 0, "Must not be static");
return first;
}
- private static String format(Method m) {
- return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
+ private static String format(Method method) {
+ return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method);
}
private static String formatType(TypeToken<?> t) {
return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
}
+
+ static class ErrorReporter {
+ private final String label;
+
+ ErrorReporter(@Nullable ErrorReporter root, String label) {
+ this.label = (root == null) ? label : String.format("%s, %s", root.label, label);
+ }
+
+ ErrorReporter forMethod(Class<? extends Annotation> annotation, Method method) {
+ return new ErrorReporter(
+ this,
+ String.format("@%s %s", annotation, (method == null) ? "(absent)" : format(method)));
+ }
+
+ void throwIllegalArgument(String message, Object... args) {
+ throw new IllegalArgumentException(label + ": " + String.format(message, args));
+ }
+
+ public void checkArgument(boolean condition, String message, Object... args) {
+ if (!condition) {
+ throwIllegalArgument(message, args);
+ }
+ }
+
+ public void checkNotNull(Object value, String message, Object... args) {
+ if (value == null) {
+ throwIllegalArgument(message, args);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 2034eba..2d92162 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -33,8 +33,6 @@ import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.Queue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -153,26 +151,6 @@ public class ReflectHelpers {
};
/**
- * Returns all interfaces of the given clazz.
- * @param clazz
- * @return
- */
- public static FluentIterable<Class<?>> getClosureOfInterfaces(Class<?> clazz) {
- checkNotNull(clazz);
- Queue<Class<?>> interfacesToProcess = Queues.newArrayDeque();
- Collections.addAll(interfacesToProcess, clazz.getInterfaces());
-
- LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
- while (!interfacesToProcess.isEmpty()) {
- Class<?> current = interfacesToProcess.remove();
- if (interfaces.add(current)) {
- Collections.addAll(interfacesToProcess, current.getInterfaces());
- }
- }
- return FluentIterable.from(interfaces);
- }
-
- /**
* Returns all the methods visible from the provided interfaces.
*
* @param interfaces The interfaces to use when searching for all their methods.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 9317ea2..e59cce8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,11 +17,12 @@
*/
package org.apache.beam.sdk.transforms.reflect;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.junit.Before;
@@ -36,20 +37,6 @@ import org.mockito.MockitoAnnotations;
/** Tests for {@link DoFnInvokers}. */
@RunWith(JUnit4.class)
public class DoFnInvokersTest {
- /** A convenience struct holding flags that indicate whether a particular method was invoked. */
- public static class Invocations {
- public boolean wasProcessElementInvoked = false;
- public boolean wasStartBundleInvoked = false;
- public boolean wasFinishBundleInvoked = false;
- public boolean wasSetupInvoked = false;
- public boolean wasTeardownInvoked = false;
- private final String name;
-
- public Invocations(String name) {
- this.name = name;
- }
- }
-
@Rule public ExpectedException thrown = ExpectedException.none();
@Mock private DoFn.ProcessContext mockContext;
@@ -81,102 +68,10 @@ public class DoFnInvokersTest {
};
}
- private void checkInvokeProcessElementWorks(DoFn<String, String> fn, Invocations... invocations)
- throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse(
- "Should not yet have called processElement on " + invocation.name,
- invocation.wasProcessElementInvoked);
- }
+ private void invokeProcessElement(DoFn<String, String> fn) {
DoFnInvokers.INSTANCE
.newByteBuddyInvoker(fn)
.invokeProcessElement(mockContext, extraContextFactory);
- for (Invocations invocation : invocations) {
- assertTrue(
- "Should have called processElement on " + invocation.name,
- invocation.wasProcessElementInvoked);
- }
- }
-
- private void checkInvokeStartBundleWorks(DoFn<String, String> fn, Invocations... invocations)
- throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse(
- "Should not yet have called startBundle on " + invocation.name,
- invocation.wasStartBundleInvoked);
- }
- DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(mockContext);
- for (Invocations invocation : invocations) {
- assertTrue(
- "Should have called startBundle on " + invocation.name, invocation.wasStartBundleInvoked);
- }
- }
-
- private void checkInvokeFinishBundleWorks(DoFn<String, String> fn, Invocations... invocations)
- throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse(
- "Should not yet have called finishBundle on " + invocation.name,
- invocation.wasFinishBundleInvoked);
- }
- DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(mockContext);
- for (Invocations invocation : invocations) {
- assertTrue(
- "Should have called finishBundle on " + invocation.name,
- invocation.wasFinishBundleInvoked);
- }
- }
-
- private void checkInvokeSetupWorks(DoFn<String, String> fn, Invocations... invocations)
- throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse(
- "Should not yet have called setup on " + invocation.name, invocation.wasSetupInvoked);
- }
- DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeSetup();
- for (Invocations invocation : invocations) {
- assertTrue("Should have called setup on " + invocation.name, invocation.wasSetupInvoked);
- }
- }
-
- private void checkInvokeTeardownWorks(DoFn<String, String> fn, Invocations... invocations)
- throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse(
- "Should not yet have called teardown on " + invocation.name,
- invocation.wasTeardownInvoked);
- }
- DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeTeardown();
- for (Invocations invocation : invocations) {
- assertTrue(
- "Should have called teardown on " + invocation.name, invocation.wasTeardownInvoked);
- }
- }
-
- @Test
- public void testDoFnWithNoExtraContext() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFn<String, String> fn =
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
- };
-
- assertFalse(
- DoFnSignatures.INSTANCE
- .getOrParseSignature(fn.getClass())
- .processElement()
- .usesSingleWindow());
-
- checkInvokeProcessElementWorks(fn, invocations);
}
@Test
@@ -190,6 +85,21 @@ public class DoFnInvokersTest {
DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass());
}
+ // ---------------------------------------------------------------------------------------
+ // Tests for general invocations of DoFn methods.
+ // ---------------------------------------------------------------------------------------
+
+ @Test
+ public void testDoFnWithNoExtraContext() throws Exception {
+ class MockFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {}
+ }
+ MockFn fn = mock(MockFn.class);
+ invokeProcessElement(fn);
+ verify(fn).processElement(mockContext);
+ }
+
interface InterfaceWithProcessElement {
@DoFn.ProcessElement
void processElement(DoFn<String, String>.ProcessContext c);
@@ -199,302 +109,221 @@ public class DoFnInvokersTest {
private class IdentityUsingInterfaceWithProcessElement extends DoFn<String, String>
implements LayersOfInterfaces {
-
- private Invocations invocations = new Invocations("Named Class");
-
@Override
- public void processElement(DoFn<String, String>.ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
+ public void processElement(DoFn<String, String>.ProcessContext c) {}
}
@Test
public void testDoFnWithProcessElementInterface() throws Exception {
- IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
- assertFalse(
- DoFnSignatures.INSTANCE
- .getOrParseSignature(fn.getClass())
- .processElement()
- .usesSingleWindow());
- checkInvokeProcessElementWorks(fn, fn.invocations);
+ IdentityUsingInterfaceWithProcessElement fn =
+ mock(IdentityUsingInterfaceWithProcessElement.class);
+ invokeProcessElement(fn);
+ verify(fn).processElement(mockContext);
}
private class IdentityParent extends DoFn<String, String> {
- protected Invocations parentInvocations = new Invocations("IdentityParent");
-
@ProcessElement
- public void process(ProcessContext c) {
- parentInvocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
+ public void process(ProcessContext c) {}
}
private class IdentityChildWithoutOverride extends IdentityParent {}
private class IdentityChildWithOverride extends IdentityParent {
- protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
-
@Override
public void process(DoFn<String, String>.ProcessContext c) {
super.process(c);
- childInvocations.wasProcessElementInvoked = true;
}
}
@Test
public void testDoFnWithMethodInSuperclass() throws Exception {
- IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
- assertFalse(
- DoFnSignatures.INSTANCE
- .getOrParseSignature(fn.getClass())
- .processElement()
- .usesSingleWindow());
- checkInvokeProcessElementWorks(fn, fn.parentInvocations);
+ IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
+ invokeProcessElement(fn);
+ verify(fn).process(mockContext);
}
@Test
public void testDoFnWithMethodInSubclass() throws Exception {
- IdentityChildWithOverride fn = new IdentityChildWithOverride();
- assertFalse(
- DoFnSignatures.INSTANCE
- .getOrParseSignature(fn.getClass())
- .processElement()
- .usesSingleWindow());
- checkInvokeProcessElementWorks(fn, fn.parentInvocations, fn.childInvocations);
+ IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
+ invokeProcessElement(fn);
+ verify(fn).process(mockContext);
}
@Test
public void testDoFnWithWindow() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFn<String, String> fn =
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow w) throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(w, mockWindow);
- }
- };
-
- assertTrue(
- DoFnSignatures.INSTANCE
- .getOrParseSignature(fn.getClass())
- .processElement()
- .usesSingleWindow());
-
- checkInvokeProcessElementWorks(fn, invocations);
+ class MockFn extends DoFn<String, String> {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow w) throws Exception {}
+ }
+ MockFn fn = mock(MockFn.class);
+ invokeProcessElement(fn);
+ verify(fn).processElement(mockContext, mockWindow);
}
@Test
public void testDoFnWithOutputReceiver() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFn<String, String> fn =
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(o, mockOutputReceiver);
- }
- };
-
- assertFalse(
- DoFnSignatures.INSTANCE
- .getOrParseSignature(fn.getClass())
- .processElement()
- .usesSingleWindow());
-
- checkInvokeProcessElementWorks(fn, invocations);
+ class MockFn extends DoFn<String, String> {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {}
+ }
+ MockFn fn = mock(MockFn.class);
+ invokeProcessElement(fn);
+ verify(fn).processElement(mockContext, mockOutputReceiver);
}
@Test
public void testDoFnWithInputProvider() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFn<String, String> fn =
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c, InputProvider<String> i) throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(i, mockInputProvider);
- }
- };
-
- assertFalse(
- DoFnSignatures.INSTANCE
- .getOrParseSignature(fn.getClass())
- .processElement()
- .usesSingleWindow());
-
- checkInvokeProcessElementWorks(fn, invocations);
- }
-
- @Test
- public void testDoFnWithStartBundle() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFn<String, String> fn =
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
- @StartBundle
- public void startBundle(Context c) {
- invocations.wasStartBundleInvoked = true;
- assertSame(c, mockContext);
- }
-
- @FinishBundle
- public void finishBundle(Context c) {
- invocations.wasFinishBundleInvoked = true;
- assertSame(c, mockContext);
- }
- };
-
- checkInvokeStartBundleWorks(fn, invocations);
- checkInvokeFinishBundleWorks(fn, invocations);
+ class MockFn extends DoFn<String, String> {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c, InputProvider<String> o) throws Exception {}
+ }
+ MockFn fn = mock(MockFn.class);
+ invokeProcessElement(fn);
+ verify(fn).processElement(mockContext, mockInputProvider);
}
@Test
- public void testDoFnWithSetupTeardown() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFn<String, String> fn =
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
- @StartBundle
- public void startBundle(Context c) {
- invocations.wasStartBundleInvoked = true;
- assertSame(c, mockContext);
- }
+ public void testDoFnWithStartBundleSetupTeardown() throws Exception {
+ class MockFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {}
- @FinishBundle
- public void finishBundle(Context c) {
- invocations.wasFinishBundleInvoked = true;
- assertSame(c, mockContext);
- }
+ @StartBundle
+ public void startBundle(Context c) {}
- @Setup
- public void before() {
- invocations.wasSetupInvoked = true;
- }
+ @FinishBundle
+ public void finishBundle(Context c) {}
- @Teardown
- public void after() {
- invocations.wasTeardownInvoked = true;
- }
- };
+ @Setup
+ public void before() {}
- checkInvokeSetupWorks(fn, invocations);
- checkInvokeTeardownWorks(fn, invocations);
- }
+ @Teardown
+ public void after() {}
+ }
+ MockFn fn = mock(MockFn.class);
+ DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ invoker.invokeSetup();
+ invoker.invokeStartBundle(mockContext);
+ invoker.invokeFinishBundle(mockContext);
+ invoker.invokeTeardown();
+ verify(fn).before();
+ verify(fn).startBundle(mockContext);
+ verify(fn).finishBundle(mockContext);
+ verify(fn).after();
+ }
+
+ // ---------------------------------------------------------------------------------------
+ // Tests for ability to invoke private, inner and anonymous classes.
+ // ---------------------------------------------------------------------------------------
private static class PrivateDoFnClass extends DoFn<String, String> {
- final Invocations invocations = new Invocations(getClass().getName());
-
@ProcessElement
- public void processThis(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
+ public void processThis(ProcessContext c) {}
}
@Test
public void testLocalPrivateDoFnClass() throws Exception {
- PrivateDoFnClass fn = new PrivateDoFnClass();
- checkInvokeProcessElementWorks(fn, fn.invocations);
+ PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
+ invokeProcessElement(fn);
+ verify(fn).processThis(mockContext);
}
@Test
public void testStaticPackagePrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
- checkInvokeProcessElementWorks(
- DoFnInvokersTestHelper.newStaticPackagePrivateDoFn(invocations), invocations);
+ DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
+ invokeProcessElement(fn);
+ DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockContext);
}
@Test
public void testInnerPackagePrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
- checkInvokeProcessElementWorks(
- new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn(invocations), invocations);
+ DoFn<String, String> fn =
+ mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
+ invokeProcessElement(fn);
+ DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockContext);
}
@Test
public void testStaticPrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticPrivateDoFn");
- checkInvokeProcessElementWorks(
- DoFnInvokersTestHelper.newStaticPrivateDoFn(invocations), invocations);
+ DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
+ invokeProcessElement(fn);
+ DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockContext);
}
@Test
public void testInnerPrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticInnerDoFn");
- checkInvokeProcessElementWorks(
- new DoFnInvokersTestHelper().newInnerPrivateDoFn(invocations), invocations);
+ DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
+ invokeProcessElement(fn);
+ DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockContext);
}
@Test
- public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
- Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
- checkInvokeProcessElementWorks(
- new DoFnInvokersTestHelper().newInnerAnonymousDoFn(invocations), invocations);
+ public void testAnonymousInnerDoFn() throws Exception {
+ DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
+ invokeProcessElement(fn);
+ DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockContext);
}
@Test
public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
- Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
- checkInvokeProcessElementWorks(
- DoFnInvokersTestHelper.newStaticAnonymousDoFn(invocations), invocations);
+ // Can't use mockito for this one - the anonymous class is final and can't be mocked.
+ DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn();
+ invokeProcessElement(fn);
+ DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockContext);
}
+ // ---------------------------------------------------------------------------------------
+ // Tests for wrapping exceptions.
+ // ---------------------------------------------------------------------------------------
+
@Test
public void testProcessElementException() throws Exception {
- DoFn<Integer, Integer> fn =
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {
- throw new IllegalArgumentException("bogus");
- }
- };
-
+ DoFnInvoker<Integer, Integer> invoker =
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+ new DoFn<Integer, Integer>() {
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {
+ throw new IllegalArgumentException("bogus");
+ }
+ });
thrown.expect(UserCodeException.class);
thrown.expectMessage("bogus");
- DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeProcessElement(null, null);
+ invoker.invokeProcessElement(null, null);
}
@Test
public void testStartBundleException() throws Exception {
- DoFn<Integer, Integer> fn =
- new DoFn<Integer, Integer>() {
- @StartBundle
- public void startBundle(@SuppressWarnings("unused") Context c) {
- throw new IllegalArgumentException("bogus");
- }
-
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
- };
-
+ DoFnInvoker<Integer, Integer> invoker =
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+ new DoFn<Integer, Integer>() {
+ @StartBundle
+ public void startBundle(@SuppressWarnings("unused") Context c) {
+ throw new IllegalArgumentException("bogus");
+ }
+
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+ });
thrown.expect(UserCodeException.class);
thrown.expectMessage("bogus");
- DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(null);
+ invoker.invokeStartBundle(null);
}
@Test
public void testFinishBundleException() throws Exception {
- DoFn<Integer, Integer> fn =
- new DoFn<Integer, Integer>() {
- @FinishBundle
- public void finishBundle(@SuppressWarnings("unused") Context c) {
- throw new IllegalArgumentException("bogus");
- }
-
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
- };
-
+ DoFnInvoker<Integer, Integer> invoker =
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+ new DoFn<Integer, Integer>() {
+ @FinishBundle
+ public void finishBundle(@SuppressWarnings("unused") Context c) {
+ throw new IllegalArgumentException("bogus");
+ }
+
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+ });
thrown.expect(UserCodeException.class);
thrown.expectMessage("bogus");
- DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(null);
+ invoker.invokeFinishBundle(null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
deleted file mode 100644
index 7bfdddc..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.reflect;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest.Invocations;
-
-/**
- * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access
- * to DoFns in other packages.
- */
-public class DoFnInvokersTestHelper {
-
- private static class StaticPrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public StaticPrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- private class InnerPrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public InnerPrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- static class StaticPackagePrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public StaticPackagePrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- class InnerPackagePrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public InnerPackagePrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- public static DoFn<String, String> newStaticPackagePrivateDoFn(
- Invocations invocations) {
- return new StaticPackagePrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
- return new InnerPackagePrivateDoFn(invocations);
- }
-
- public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
- return new StaticPrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
- return new InnerPrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
- return new DoFn<String, String>() {
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- };
- }
-
- public static DoFn<String, String> newStaticAnonymousDoFn(
- final Invocations invocations) {
- return new DoFn<String, String>() {
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
new file mode 100644
index 0000000..c269dbd
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.analyzeProcessElementMethod;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DoFnSignatures} verification of the {@link DoFn.ProcessElement} method. */
+@SuppressWarnings("unused")
+@RunWith(JUnit4.class)
+public class DoFnSignaturesProcessElementTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testMissingProcessContext() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Must take ProcessContext<> as the first argument");
+
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private void method() {}
+ });
+ }
+
+ @Test
+ public void testBadProcessContextType() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Must take ProcessContext<> as the first argument");
+
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private void method(String s) {}
+ });
+ }
+
+ @Test
+ public void testBadExtraProcessContextType() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Integer is not a valid context parameter. "
+ + "Should be one of [BoundedWindow]");
+
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private void method(DoFn<Integer, String>.ProcessContext c, Integer n) {}
+ });
+ }
+
+ @Test
+ public void testBadReturnType() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Must return void");
+
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private int method(DoFn<Integer, String>.ProcessContext context) {
+ return 0;
+ }
+ });
+ }
+
+ @Test
+ public void testGoodConcreteTypes() throws Exception {
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private void method(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<String> output) {}
+ });
+ }
+
+ @Test
+ public void testBadGenericsTwoArgs() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Wrong type of OutputReceiver parameter: "
+ + "OutputReceiver<Integer>, should be OutputReceiver<String>");
+
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private void method(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<Integer> output) {}
+ });
+ }
+
+ @Test
+ public void testBadGenericWildCards() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Wrong type of OutputReceiver parameter: "
+ + "OutputReceiver<? super Integer>, should be OutputReceiver<String>");
+
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private void method(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<? super Integer> output) {}
+ });
+ }
+
+ static class BadTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> {
+ @ProcessElement
+ @SuppressWarnings("unused")
+ public void badTypeVariables(
+ DoFn<InputT, OutputT>.ProcessContext c,
+ DoFn.InputProvider<InputT> input,
+ DoFn.OutputReceiver<InputT> output) {}
+ }
+
+ @Test
+ public void testBadTypeVariables() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Wrong type of OutputReceiver parameter: "
+ + "OutputReceiver<InputT>, should be OutputReceiver<OutputT>");
+
+ DoFnSignatures.INSTANCE.getOrParseSignature(BadTypeVariables.class);
+ }
+
+ @Test
+ public void testNoProcessElement() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("No method annotated with @ProcessElement found");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(new DoFn<String, String>() {}.getClass());
+ }
+
+ @Test
+ public void testMultipleProcessElement() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
+ thrown.expectMessage("foo()");
+ thrown.expectMessage("bar()");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void foo() {}
+
+ @ProcessElement
+ public void bar() {}
+ }.getClass());
+ }
+
+ @Test
+ public void testPrivateProcessElement() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("process()");
+ thrown.expectMessage("Must be public");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ private void process() {}
+ }.getClass());
+ }
+
+ private static class GoodTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> {
+ @ProcessElement
+ @SuppressWarnings("unused")
+ public void goodTypeVariables(
+ DoFn<InputT, OutputT>.ProcessContext c,
+ DoFn.InputProvider<InputT> input,
+ DoFn.OutputReceiver<OutputT> output) {}
+ }
+
+ @Test
+ public void testGoodTypeVariables() throws Exception {
+ DoFnSignatures.INSTANCE.getOrParseSignature(GoodTypeVariables.class);
+ }
+
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @ProcessElement
+ @SuppressWarnings("unused")
+ public void processElement(ProcessContext c, InputProvider<T> input, OutputReceiver<T> output) {
+ c.output(c.element());
+ }
+ }
+
+ private static class IdentityListFn<T> extends IdentityFn<List<T>> {}
+
+ @Test
+ public void testIdentityFnApplied() throws Exception {
+ DoFnSignatures.INSTANCE.getOrParseSignature(new IdentityFn<String>() {}.getClass());
+ }
+}