You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/16 05:18:52 UTC
[1/2] incubator-beam git commit: Connect generated
DoFnInvoker.invokerOnTimer to OnTimerInvoker
Repository: incubator-beam
Updated Branches:
refs/heads/master dbbd5e448 -> dc94dbdd7
Connect generated DoFnInvoker.invokerOnTimer to OnTimerInvoker
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a945a025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a945a025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a945a025
Branch: refs/heads/master
Commit: a945a025301ca09a4cfc160302ef3914429dc15e
Parents: dbbd5e4
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 1 21:23:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 15 20:08:41 2016 -0800
----------------------------------------------------------------------
.../reflect/ByteBuddyDoFnInvokerFactory.java | 152 ++++++++++++++-----
.../reflect/ByteBuddyOnTimerInvokerFactory.java | 24 ++-
.../sdk/transforms/reflect/DoFnInvoker.java | 4 +
.../sdk/transforms/reflect/DoFnInvokers.java | 6 +
.../sdk/transforms/reflect/OnTimerInvoker.java | 2 +-
.../transforms/reflect/DoFnInvokersTest.java | 137 ++++++++++++++++-
.../testhelper/DoFnInvokersTestHelper.java | 137 +++++++++++++++++
7 files changed, 415 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index c137255..bc6d8c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -31,7 +32,6 @@ import net.bytebuddy.ByteBuddy;
import net.bytebuddy.NamingStrategy;
import net.bytebuddy.description.field.FieldDescription;
import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.description.modifier.FieldManifestation;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.description.type.TypeList;
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter;
@@ -128,6 +129,54 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
return newByteBuddyInvoker(DoFnSignatures.getSignature((Class) fn.getClass()), fn);
}
+ /**
+ * Internal base class for generated {@link DoFnInvoker} instances.
+ *
+ * <p>This class should <i>not</i> be extended directly, or by Beam users. It must be public for
+ * generated instances to have adequate access, as they are generated "inside" the invoked {@link
+ * DoFn} class.
+ */
+ public abstract static class DoFnInvokerBase<InputT, OutputT, DoFnT extends DoFn<InputT, OutputT>>
+ implements DoFnInvoker<InputT, OutputT> {
+ protected DoFnT delegate;
+
+ private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>();
+
+ public DoFnInvokerBase(DoFnT delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * Associates the given timer ID with the given {@link OnTimerInvoker}.
+ *
+ * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup
+ * of the timer ID rather than a generated conditional branch to choose which
+ * OnTimerInvoker to invoke.
+ *
+ * <p>This method has package level access as it is intended only for assembly of the
+ * {@link DoFnInvokerBase} not by any subclass.
+ */
+ void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) {
+ this.onTimerInvokers.put(timerId, onTimerInvoker);
+ }
+
+ @Override
+ public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments) {
+ @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId);
+
+ if (onTimerInvoker != null) {
+ onTimerInvoker.invokeOnTimer(arguments);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Attempted to invoke timer %s on %s, but that timer is not registered."
+ + " This is the responsibility of the runner, which must only deliver"
+ + " registered timers.",
+ timerId, delegate.getClass().getName()));
+ }
+ }
+ }
+
/** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
DoFnSignature signature, DoFn<InputT, OutputT> fn) {
@@ -136,10 +185,18 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
"Signature is for class %s, but fn is of class %s",
signature.fnClass(),
fn.getClass());
+
try {
@SuppressWarnings("unchecked")
- DoFnInvoker<InputT, OutputT> invoker =
- (DoFnInvoker<InputT, OutputT>) getByteBuddyInvokerConstructor(signature).newInstance(fn);
+ DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
+ (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
+ getByteBuddyInvokerConstructor(signature).newInstance(fn);
+
+ for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
+ invoker.addOnTimerInvoker(onTimerMethod.id(),
+ OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
+ }
+
return invoker;
} catch (InstantiationException
| IllegalAccessException
@@ -214,31 +271,39 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
return super.name(clazzDescription);
}
})
- // Create a subclass of DoFnInvoker
- .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
- .defineField(
- FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+ // class <invoker class> extends DoFnInvokerBase {
+ .subclass(DoFnInvokerBase.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+
+ // public <invoker class>(<fn class> delegate) { this.delegate = delegate; }
.defineConstructor(Visibility.PUBLIC)
.withParameter(fnClass)
.intercept(new InvokerConstructor())
+
+ // public invokeProcessElement(ProcessContext, ExtraContextFactory) {
+ // delegate.<@ProcessElement>(... pass just the right args ...);
+ // }
.method(ElementMatchers.named("invokeProcessElement"))
- .intercept(new ProcessElementDelegation(signature.processElement()))
+ .intercept(new ProcessElementDelegation(clazzDescription, signature.processElement()))
+
+ // public invokeStartBundle(Context c) { delegate.<@StartBundle>(c); }
+ // ... etc ...
.method(ElementMatchers.named("invokeStartBundle"))
- .intercept(delegateOrNoop(signature.startBundle()))
+ .intercept(delegateOrNoop(clazzDescription, signature.startBundle()))
.method(ElementMatchers.named("invokeFinishBundle"))
- .intercept(delegateOrNoop(signature.finishBundle()))
+ .intercept(delegateOrNoop(clazzDescription, signature.finishBundle()))
.method(ElementMatchers.named("invokeSetup"))
- .intercept(delegateOrNoop(signature.setup()))
+ .intercept(delegateOrNoop(clazzDescription, signature.setup()))
.method(ElementMatchers.named("invokeTeardown"))
- .intercept(delegateOrNoop(signature.teardown()))
+ .intercept(delegateOrNoop(clazzDescription, signature.teardown()))
.method(ElementMatchers.named("invokeGetInitialRestriction"))
- .intercept(delegateWithDowncastOrThrow(signature.getInitialRestriction()))
+ .intercept(
+ delegateWithDowncastOrThrow(clazzDescription, signature.getInitialRestriction()))
.method(ElementMatchers.named("invokeSplitRestriction"))
- .intercept(splitRestrictionDelegation(signature))
+ .intercept(splitRestrictionDelegation(clazzDescription, signature))
.method(ElementMatchers.named("invokeGetRestrictionCoder"))
- .intercept(getRestrictionCoderDelegation(signature))
+ .intercept(getRestrictionCoderDelegation(clazzDescription, signature))
.method(ElementMatchers.named("invokeNewTracker"))
- .intercept(delegateWithDowncastOrThrow(signature.newTracker()));
+ .intercept(delegateWithDowncastOrThrow(clazzDescription, signature.newTracker()));
DynamicType.Unloaded<?> unloaded = builder.make();
@@ -253,13 +318,15 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
return res;
}
- private static Implementation getRestrictionCoderDelegation(DoFnSignature signature) {
+ private static Implementation getRestrictionCoderDelegation(
+ TypeDescription doFnType, DoFnSignature signature) {
if (signature.processElement().isSplittable()) {
if (signature.getRestrictionCoder() == null) {
return MethodDelegation.to(
new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
} else {
return new DowncastingParametersMethodDelegation(
+ doFnType,
signature.getRestrictionCoder().targetMethod());
}
} else {
@@ -267,26 +334,30 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
}
}
- private static Implementation splitRestrictionDelegation(DoFnSignature signature) {
+ private static Implementation splitRestrictionDelegation(
+ TypeDescription doFnType, DoFnSignature signature) {
if (signature.splitRestriction() == null) {
return MethodDelegation.to(DefaultSplitRestriction.class);
} else {
- return new DowncastingParametersMethodDelegation(signature.splitRestriction().targetMethod());
+ return new DowncastingParametersMethodDelegation(
+ doFnType, signature.splitRestriction().targetMethod());
}
}
/** Delegates to the given method if available, or does nothing. */
- private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) {
+ private static Implementation delegateOrNoop(TypeDescription doFnType, DoFnSignature.DoFnMethod
+ method) {
return (method == null)
? FixedValue.originType()
- : new DoFnMethodDelegation(method.targetMethod());
+ : new DoFnMethodDelegation(doFnType, method.targetMethod());
}
/** Delegates to the given method if available, or throws UnsupportedOperationException. */
- private static Implementation delegateWithDowncastOrThrow(DoFnSignature.DoFnMethod method) {
+ private static Implementation delegateWithDowncastOrThrow(
+ TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
return (method == null)
? ExceptionMethod.throwing(UnsupportedOperationException.class)
- : new DowncastingParametersMethodDelegation(method.targetMethod());
+ : new DowncastingParametersMethodDelegation(doFnType, method.targetMethod());
}
/**
@@ -301,7 +372,10 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
protected FieldDescription delegateField;
- public DoFnMethodDelegation(Method targetMethod) {
+ private final TypeDescription doFnType;
+
+ public DoFnMethodDelegation(TypeDescription doFnType, Method targetMethod) {
+ this.doFnType = doFnType;
this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod);
targetHasReturn = !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure());
}
@@ -311,6 +385,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
// Remember the field description of the instrumented type.
delegateField =
instrumentedType
+ .getSuperClass() // always DoFnInvokerBase
.getDeclaredFields()
.filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
.getOnly();
@@ -349,6 +424,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
MethodVariableAccess.REFERENCE.loadOffset(0),
// Access this.delegate (DoFn on top of the stack)
FieldAccess.forField(delegateField).getter(),
+ // Cast it to the more precise type
+ TypeCasting.to(doFnType),
// Run the beforeDelegation manipulations.
// The arguments necessary to invoke the target are on top of the stack.
beforeDelegation(instrumentedMethod),
@@ -400,8 +477,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
* to its expected type.
*/
private static class DowncastingParametersMethodDelegation extends DoFnMethodDelegation {
- DowncastingParametersMethodDelegation(Method method) {
- super(method);
+ DowncastingParametersMethodDelegation(TypeDescription doFnType, Method method) {
+ super(doFnType, method);
}
@Override
@@ -536,8 +613,9 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
private final DoFnSignature.ProcessElementMethod signature;
/** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
- private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) {
- super(signature.targetMethod());
+ private ProcessElementDelegation(TypeDescription doFnType, DoFnSignature.ProcessElementMethod
+ signature) {
+ super(doFnType, signature.targetMethod());
this.signature = signature;
}
@@ -739,26 +817,16 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
new StackManipulation.Compound(
// Load the this reference
MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Load the delegate argument
+ MethodVariableAccess.REFERENCE.loadOffset(1),
// Invoke the super constructor (default constructor of Object)
MethodInvocation.invoke(
- new TypeDescription.ForLoadedType(Object.class)
+ new TypeDescription.ForLoadedType(DoFnInvokerBase.class)
.getDeclaredMethods()
.filter(
ElementMatchers.isConstructor()
- .and(ElementMatchers.takesArguments(0)))
+ .and(ElementMatchers.takesArguments(DoFn.class)))
.getOnly()),
- // Load the this reference
- MethodVariableAccess.REFERENCE.loadOffset(0),
- // Load the delegate argument
- MethodVariableAccess.REFERENCE.loadOffset(1),
- // Assign the delegate argument to the delegate field
- FieldAccess.forField(
- implementationTarget
- .getInstrumentedType()
- .getDeclaredFields()
- .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
- .getOnly())
- .putter(),
// Return void.
MethodReturn.VOID)
.apply(methodVisitor, implementationContext);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index 4e53757..7a39ed1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -180,7 +180,9 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
// this.delegate.<@OnTimer method>(... pass the right args ...)
// }
.method(ElementMatchers.named("invokeOnTimer"))
- .intercept(new InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId)));
+ .intercept(
+ new InvokeOnTimerDelegation(
+ clazzDescription, signature.onTimerMethods().get(timerId)));
DynamicType.Unloaded<?> unloaded = builder.make();
@@ -203,12 +205,28 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
private final DoFnSignature.OnTimerMethod signature;
- public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) {
- super(signature.targetMethod());
+ public InvokeOnTimerDelegation(
+ TypeDescription clazzDescription, DoFnSignature.OnTimerMethod signature) {
+ super(clazzDescription, signature.targetMethod());
this.signature = signature;
}
@Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ // Remember the field description of the instrumented type.
+ // Kind of a hack to set the protected value, because the instrumentedType
+ // is only available to prepare, while we need this information in
+ // beforeDelegation
+ delegateField =
+ instrumentedType
+ .getDeclaredFields() // the delegate is declared on the OnTimerInvoker
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly();
+ // Delegating the method call doesn't require any changes to the instrumented type.
+ return instrumentedType;
+ }
+
+ @Override
protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
// Parameters of the wrapper invoker method:
// DoFn.ArgumentProvider
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index ce68d0b..2ae7920 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -50,6 +50,10 @@ public interface DoFnInvoker<InputT, OutputT> {
*/
DoFn.ProcessContinuation invokeProcessElement(DoFn.ArgumentProvider<InputT, OutputT> extra);
+ /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */
+ void invokeOnTimer(
+ String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments);
+
/** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */
<RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 9a96985..7eccaab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -115,6 +115,12 @@ public class DoFnInvokers {
}
@Override
+ public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments) {
+ throw new UnsupportedOperationException(
+ String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName()));
+ }
+
+ @Override
public void invokeStartBundle(DoFn.Context c) {
OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
try {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
index f87fa74..bfcafd0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.transforms.reflect;
import org.apache.beam.sdk.transforms.DoFn;
/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */
-interface OnTimerInvoker<InputT, OutputT> {
+public interface OnTimerInvoker<InputT, OutputT> {
/** Invoke the {@link DoFn.OnTimer} method in the provided context. */
void invokeOnTimer(DoFn.ArgumentProvider<InputT, OutputT> extra);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index c7b71ff..3d9e3ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms.reflect;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
@@ -45,6 +46,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
@@ -55,6 +57,7 @@ import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.util.state.StateSpecs;
import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -95,6 +98,10 @@ public class DoFnInvokersTest {
return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
}
+ private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
+ DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, mockArgumentProvider);
+ }
+
@Test
public void testDoFnInvokersReused() throws Exception {
// Ensures that we don't create a new Invoker class for every instance of the DoFn.
@@ -460,7 +467,79 @@ public class DoFnInvokersTest {
}
// ---------------------------------------------------------------------------------------
- // Tests for ability to invoke private, inner and anonymous classes.
+ // Tests for ability to invoke @OnTimer for private, inner and anonymous classes.
+ // ---------------------------------------------------------------------------------------
+
+ private static final String TIMER_ID = "test-timer-id";
+
+ private static class PrivateDoFnWithTimers extends DoFn<String, String> {
+ @ProcessElement
+ public void processThis(ProcessContext c) {}
+
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(BoundedWindow w) {}
+ }
+
+ @Test
+ public void testLocalPrivateDoFnWithTimers() throws Exception {
+ PrivateDoFnWithTimers fn = mock(PrivateDoFnWithTimers.class);
+ invokeOnTimer(TIMER_ID, fn);
+ verify(fn).onTimer(mockWindow);
+ }
+
+ @Test
+ public void testStaticPackagePrivateDoFnWithTimers() throws Exception {
+ DoFn<String, String> fn =
+ mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFnWithTimers().getClass());
+ invokeOnTimer(TIMER_ID, fn);
+ DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFnWithTimers(fn, mockWindow);
+ }
+
+ @Test
+ public void testInnerPackagePrivateDoFnWithTimers() throws Exception {
+ DoFn<String, String> fn =
+ mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFnWithTimers().getClass());
+ invokeOnTimer(TIMER_ID, fn);
+ DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFnWithTimers(fn, mockWindow);
+ }
+
+ @Test
+ public void testStaticPrivateDoFnWithTimers() throws Exception {
+ DoFn<String, String> fn =
+ mock(DoFnInvokersTestHelper.newStaticPrivateDoFnWithTimers().getClass());
+ invokeOnTimer(TIMER_ID, fn);
+ DoFnInvokersTestHelper.verifyStaticPrivateDoFnWithTimers(fn, mockWindow);
+ }
+
+ @Test
+ public void testInnerPrivateDoFnWithTimers() throws Exception {
+ DoFn<String, String> fn =
+ mock(new DoFnInvokersTestHelper().newInnerPrivateDoFnWithTimers().getClass());
+ invokeOnTimer(TIMER_ID, fn);
+ DoFnInvokersTestHelper.verifyInnerPrivateDoFnWithTimers(fn, mockWindow);
+ }
+
+ @Test
+ public void testAnonymousInnerDoFnWithTimers() throws Exception {
+ DoFn<String, String> fn =
+ mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFnWithTimers().getClass());
+ invokeOnTimer(TIMER_ID, fn);
+ DoFnInvokersTestHelper.verifyInnerAnonymousDoFnWithTimers(fn, mockWindow);
+ }
+
+ @Test
+ public void testStaticAnonymousDoFnWithTimersInOtherPackage() throws Exception {
+ // Can't use mockito for this one - the anonymous class is final and can't be mocked.
+ DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFnWithTimers();
+ invokeOnTimer(TIMER_ID, fn);
+ DoFnInvokersTestHelper.verifyStaticAnonymousDoFnWithTimersInvoked(fn, mockWindow);
+ }
+
+ // ---------------------------------------------------------------------------------------
+ // Tests for ability to invoke @ProcessElement for private, inner and anonymous classes.
// ---------------------------------------------------------------------------------------
private static class PrivateDoFnClass extends DoFn<String, String> {
@@ -605,6 +684,62 @@ public class DoFnInvokersTest {
invoker.invokeFinishBundle(null);
}
+ @Test
+ public void testOnTimerHelloWord() throws Exception {
+ final String timerId = "my-timer-id";
+
+ class SimpleTimerDoFn extends DoFn<String, String> {
+
+ public String status = "not yet";
+
+ @TimerId(timerId)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnTimer(timerId)
+ public void onMyTimer() {
+ status = "OK now";
+ }
+ }
+
+ SimpleTimerDoFn fn = new SimpleTimerDoFn();
+
+ DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+ invoker.invokeOnTimer(timerId, mockArgumentProvider);
+ assertThat(fn.status, equalTo("OK now"));
+ }
+
+ @Test
+ public void testOnTimerWithWindow() throws Exception {
+ final String timerId = "my-timer-id";
+ final IntervalWindow testWindow = new IntervalWindow(new Instant(0), new Instant(15));
+ when(mockArgumentProvider.window()).thenReturn(testWindow);
+
+ class SimpleTimerDoFn extends DoFn<String, String> {
+
+ public IntervalWindow window = null;
+
+ @TimerId(timerId)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnTimer(timerId)
+ public void onMyTimer(IntervalWindow w) {
+ window = w;
+ }
+ }
+
+ SimpleTimerDoFn fn = new SimpleTimerDoFn();
+
+ DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+ invoker.invokeOnTimer(timerId, mockArgumentProvider);
+ assertThat(fn.window, equalTo(testWindow));
+ }
+
private class OldDoFnIdentity extends OldDoFn<String, String> {
public void processElement(ProcessContext c) {}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
index c20a788..95e7c49 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
@@ -23,6 +23,10 @@ import static org.mockito.Mockito.verify;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
/**
* Test helper for {@link DoFnInvokersTest}, which needs to test package-private access to DoFns in
@@ -121,4 +125,137 @@ public class DoFnInvokersTestHelper {
fn.getClass().getMethod("verify", DoFn.ProcessContext.class).invoke(fn, context);
}
+
+ //
+ // Classes for testing OnTimer methods when the DoFn does not live in the same package
+ //
+
+ private static final String TIMER_ID = "test-timer-id";
+
+ private static class StaticPrivateDoFnWithTimers extends DoFn<String, String> {
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(BoundedWindow w) {}
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+ }
+
+ private class InnerPrivateDoFnWithTimers extends DoFn<String, String> {
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(BoundedWindow w) {}
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+ }
+
+ static class StaticPackagePrivateDoFnWithTimers extends DoFn<String, String> {
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(BoundedWindow w) {}
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+ }
+
+ class InnerPackagePrivateDoFnWithTimers extends DoFn<String, String> {
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(BoundedWindow w) {}
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+ }
+
+ public static DoFn<String, String> newStaticPackagePrivateDoFnWithTimers() {
+ return new StaticPackagePrivateDoFnWithTimers();
+ }
+
+ public static void verifyStaticPackagePrivateDoFnWithTimers(
+ DoFn<String, String> fn, BoundedWindow window) {
+ verify((StaticPackagePrivateDoFnWithTimers) fn).onTimer(window);
+ }
+
+ public DoFn<String, String> newInnerPackagePrivateDoFnWithTimers() {
+ return new InnerPackagePrivateDoFnWithTimers();
+ }
+
+ public static void verifyInnerPackagePrivateDoFnWithTimers(
+ DoFn<String, String> fn, BoundedWindow window) {
+ verify((InnerPackagePrivateDoFnWithTimers) fn).onTimer(window);
+ }
+
+ public static DoFn<String, String> newStaticPrivateDoFnWithTimers() {
+ return new StaticPrivateDoFnWithTimers();
+ }
+
+ public static void verifyStaticPrivateDoFnWithTimers(
+ DoFn<String, String> fn, BoundedWindow window) {
+ verify((StaticPrivateDoFnWithTimers) fn).onTimer(window);
+ }
+
+ public DoFn<String, String> newInnerPrivateDoFnWithTimers() {
+ return new InnerPrivateDoFnWithTimers();
+ }
+
+ public static void verifyInnerPrivateDoFnWithTimers(
+ DoFn<String, String> fn, BoundedWindow window) {
+ verify((InnerPrivateDoFnWithTimers) fn).onTimer(window);
+ }
+
+ public DoFn<String, String> newInnerAnonymousDoFnWithTimers() {
+ return new DoFn<String, String>() {
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(BoundedWindow w) {}
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+ };
+ }
+
+ public static void verifyInnerAnonymousDoFnWithTimers(
+ DoFn<String, String> fn, BoundedWindow window) throws Exception {
+ DoFn<String, String> verifier = verify(fn);
+ verifier.getClass().getMethod("onTimer", BoundedWindow.class).invoke(verifier, window);
+ }
+
+ public static DoFn<String, String> newStaticAnonymousDoFnWithTimers() {
+ return new DoFn<String, String>() {
+ private BoundedWindow invokedWindow;
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(BoundedWindow window) {
+ assertNull("Should have been invoked just once", invokedWindow);
+ invokedWindow = window;
+ }
+
+ @SuppressWarnings("unused")
+ public void verify(BoundedWindow window) {
+ assertEquals(window, invokedWindow);
+ }
+ };
+ }
+
+ public static void verifyStaticAnonymousDoFnWithTimersInvoked(
+ DoFn<String, String> fn, BoundedWindow window) throws Exception {
+ fn.getClass().getMethod("verify", BoundedWindow.class).invoke(fn, window);
+ }
}
[2/2] incubator-beam git commit: This closes #1307
Posted by ke...@apache.org.
This closes #1307
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc94dbdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc94dbdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc94dbdd
Branch: refs/heads/master
Commit: dc94dbdd7b93a98f1dbe7a616f5134b95be4563c
Parents: dbbd5e4 a945a02
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:18:31 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 15 21:18:31 2016 -0800
----------------------------------------------------------------------
.../reflect/ByteBuddyDoFnInvokerFactory.java | 152 ++++++++++++++-----
.../reflect/ByteBuddyOnTimerInvokerFactory.java | 24 ++-
.../sdk/transforms/reflect/DoFnInvoker.java | 4 +
.../sdk/transforms/reflect/DoFnInvokers.java | 6 +
.../sdk/transforms/reflect/OnTimerInvoker.java | 2 +-
.../transforms/reflect/DoFnInvokersTest.java | 137 ++++++++++++++++-
.../testhelper/DoFnInvokersTestHelper.java | 137 +++++++++++++++++
7 files changed, 415 insertions(+), 47 deletions(-)
----------------------------------------------------------------------