You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/10 22:18:22 UTC
[2/4] incubator-beam git commit: Remove OnTimerInvokers.INSTANCE;
deprecate DoFnInvokers.INSTANCE
Remove OnTimerInvokers.INSTANCE; deprecate DoFnInvokers.INSTANCE
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/14a71e43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/14a71e43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/14a71e43
Branch: refs/heads/master
Commit: 14a71e435acd9435ce02afe774df3adebd7355f0
Parents: efad9d4
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 7 23:03:46 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 10 14:18:07 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 2 +-
.../beam/runners/core/SplittableParDo.java | 10 +-
.../runners/direct/DoFnLifecycleManager.java | 4 +-
.../beam/sdk/transforms/DoFnAdapters.java | 4 +-
.../reflect/ByteBuddyDoFnInvokerFactory.java | 828 +++++++++++++++++++
.../reflect/ByteBuddyOnTimerInvokerFactory.java | 279 +++++++
.../transforms/reflect/DoFnInvokerFactory.java | 27 +
.../sdk/transforms/reflect/DoFnInvokers.java | 711 +---------------
.../reflect/OnTimerInvokerFactory.java | 36 +
.../sdk/transforms/reflect/OnTimerInvokers.java | 243 +-----
.../transforms/reflect/DoFnInvokersTest.java | 24 +-
.../transforms/reflect/OnTimerInvokersTest.java | 2 +-
.../transforms/DoFnInvokersBenchmark.java | 2 +-
13 files changed, 1227 insertions(+), 945 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 2c5a850..3b784d1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -97,7 +97,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
this.fn = fn;
this.observesWindow =
DoFnSignatures.getSignature(fn.getClass()).processElement().observesWindow();
- this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ this.invoker = DoFnInvokers.invokerFor(fn);
this.outputManager = outputManager;
this.mainOutputTag = mainOutputTag;
this.context =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 7143ffe..e05ba56 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -102,8 +102,8 @@ public class SplittableParDo<
public PCollection<OutputT> apply(PCollection<InputT> input) {
PCollection.IsBounded isFnBounded = signature.isBoundedPerElement();
Coder<RestrictionT> restrictionCoder =
- DoFnInvokers.INSTANCE
- .newByteBuddyInvoker(fn)
+ DoFnInvokers
+ .invokerFor(fn)
.invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder =
ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
@@ -166,7 +166,7 @@ public class SplittableParDo<
@Setup
public void setup() {
- invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ invoker = DoFnInvokers.invokerFor(fn);
}
@ProcessElement
@@ -246,7 +246,7 @@ public class SplittableParDo<
@Override
public void setup() throws Exception {
- invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ invoker = DoFnInvokers.invokerFor(fn);
}
@Override
@@ -460,7 +460,7 @@ public class SplittableParDo<
@Setup
public void setup() {
- invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(splittableFn);
+ invoker = DoFnInvokers.invokerFor(splittableFn);
}
@ProcessElement
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
index 472b28b..67d957c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
@@ -99,7 +99,7 @@ class DoFnLifecycleManager {
public DoFn<?, ?> load(Thread key) throws Exception {
DoFn<?, ?> fn = (DoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original,
"DoFn Copy in thread " + key.getName());
- DoFnInvokers.INSTANCE.invokerFor(fn).invokeSetup();
+ DoFnInvokers.invokerFor(fn).invokeSetup();
return fn;
}
}
@@ -108,7 +108,7 @@ class DoFnLifecycleManager {
@Override
public void onRemoval(RemovalNotification<Thread, DoFn<?, ?>> notification) {
try {
- DoFnInvokers.INSTANCE.newByteBuddyInvoker(notification.getValue()).invokeTeardown();
+ DoFnInvokers.invokerFor(notification.getValue()).invokeTeardown();
} catch (Exception e) {
thrownOnTeardown.put(notification.getKey(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index a5e1c21..086b985 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -201,7 +201,7 @@ public class DoFnAdapters {
SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
super(fn.aggregators);
this.fn = fn;
- this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ this.invoker = DoFnInvokers.invokerFor(fn);
}
@Override
@@ -254,7 +254,7 @@ public class DoFnAdapters {
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
- this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ this.invoker = DoFnInvokers.invokerFor(fn);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
new file mode 100644
index 0000000..825aa09
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -0,0 +1,828 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.NamingStrategy;
+import net.bytebuddy.description.field.FieldDescription;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.modifier.FieldManifestation;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeList;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.ExceptionMethod;
+import net.bytebuddy.implementation.FixedValue;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.Implementation.Context;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.Throw;
+import net.bytebuddy.implementation.bytecode.assign.Assigner;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.constant.TextConstant;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.jar.asm.Label;
+import net.bytebuddy.jar.asm.MethodVisitor;
+import net.bytebuddy.jar.asm.Opcodes;
+import net.bytebuddy.jar.asm.Type;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
+public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
+
+ /**
+ * Returns a {@link ByteBuddyDoFnInvokerFactory} shared with all other invocations, so that its
+ * cache of generated classes is global.
+ */
+ public static ByteBuddyDoFnInvokerFactory only() {
+ return INSTANCE;
+ }
+
+ /**
+ * Creates a {@link DoFnInvoker} for the given {@link DoFn} by generating bytecode that directly
+ * invokes its methods with arguments extracted from the {@link ExtraContextFactory}.
+ */
+ @Override
+ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, OutputT> fn) {
+ return newByteBuddyInvoker(fn);
+ }
+
+ private static final ByteBuddyDoFnInvokerFactory INSTANCE = new ByteBuddyDoFnInvokerFactory();
+
+ private static final String FN_DELEGATE_FIELD_NAME = "delegate";
+
+ /**
+ * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
+ * Needed because generating an invoker class is expensive, and to avoid generating an excessive
+ * number of classes consuming PermGen memory.
+ */
+ private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
+ new LinkedHashMap<>();
+
+ private ByteBuddyDoFnInvokerFactory() {}
+
+ static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, OutputT> {
+
+ private final OldDoFn<InputT, OutputT> fn;
+
+ public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) {
+ this.fn = fn;
+ }
+
+ @Override
+ public DoFn.ProcessContinuation invokeProcessElement(
+ DoFn<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra) {
+ OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
+ DoFnAdapters.adaptProcessContext(fn, c, extra);
+ try {
+ fn.processElement(oldCtx);
+ return DoFn.ProcessContinuation.stop();
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public void invokeStartBundle(DoFn.Context c) {
+ OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
+ try {
+ fn.startBundle(oldCtx);
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public void invokeFinishBundle(DoFn.Context c) {
+ OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
+ try {
+ fn.finishBundle(oldCtx);
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public void invokeSetup() {
+ try {
+ fn.setup();
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public void invokeTeardown() {
+ try {
+ fn.teardown();
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element) {
+ throw new UnsupportedOperationException("OldDoFn is not splittable");
+ }
+
+ @Override
+ public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(
+ CoderRegistry coderRegistry) {
+ throw new UnsupportedOperationException("OldDoFn is not splittable");
+ }
+
+ @Override
+ public <RestrictionT> void invokeSplitRestriction(
+ InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
+ throw new UnsupportedOperationException("OldDoFn is not splittable");
+ }
+
+ @Override
+ public <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ TrackerT invokeNewTracker(RestrictionT restriction) {
+ throw new UnsupportedOperationException("OldDoFn is not splittable");
+ }
+ }
+
+ /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
+ DoFn<InputT, OutputT> fn) {
+ return newByteBuddyInvoker(
+ DoFnSignatures.getSignature((Class) fn.getClass()), fn);
+ }
+
+ /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
+ DoFnSignature signature, DoFn<InputT, OutputT> fn) {
+ checkArgument(
+ signature.fnClass().equals(fn.getClass()),
+ "Signature is for class %s, but fn is of class %s",
+ signature.fnClass(),
+ fn.getClass());
+ try {
+ @SuppressWarnings("unchecked")
+ DoFnInvoker<InputT, OutputT> invoker =
+ (DoFnInvoker<InputT, OutputT>)
+ getByteBuddyInvokerConstructor(signature).newInstance(fn);
+ return invoker;
+ } catch (InstantiationException
+ | IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | SecurityException e) {
+ throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e);
+ }
+ }
+
+ /**
+ * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class.
+ *
+ * <p>These are cached such that at most one {@link DoFnInvoker} class exists for a given
+ * {@link DoFn} class.
+ */
+ private synchronized Constructor<?> getByteBuddyInvokerConstructor(
+ DoFnSignature signature) {
+ Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
+ Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
+ if (constructor == null) {
+ Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
+ try {
+ constructor = invokerClass.getConstructor(fnClass);
+ } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException(e);
+ }
+ byteBuddyInvokerConstructorCache.put(fnClass, constructor);
+ }
+ return constructor;
+ }
+
+ /** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
+ public static class DefaultSplitRestriction {
+ /** Doesn't split the restriction. */
+ @SuppressWarnings("unused")
+ public static <InputT, RestrictionT> void invokeSplitRestriction(
+ InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
+ receiver.output(restriction);
+ }
+ }
+
+ /** Default implementation of {@link DoFn.GetRestrictionCoder}, for delegation by bytebuddy. */
+ public static class DefaultRestrictionCoder {
+ private final TypeDescriptor<?> restrictionType;
+
+ DefaultRestrictionCoder(TypeDescriptor<?> restrictionType) {
+ this.restrictionType = restrictionType;
+ }
+
+ /** Doesn't split the restriction. */
+ @SuppressWarnings({"unused", "unchecked"})
+ public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry registry)
+ throws CannotProvideCoderException {
+ return (Coder) registry.getCoder(restrictionType);
+ }
+ }
+
+ /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
+ private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
+ Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
+
+ final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);
+
+ DynamicType.Builder<?> builder =
+ new ByteBuddy()
+ // Create subclasses inside the target class, to have access to
+ // private and package-private bits
+ .with(
+ new NamingStrategy.SuffixingRandom("auxiliary") {
+ @Override
+ public String subclass(TypeDescription.Generic superClass) {
+ return super.name(clazzDescription);
+ }
+ })
+ // Create a subclass of DoFnInvoker
+ .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+ .defineField(
+ FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+ .defineConstructor(Visibility.PUBLIC)
+ .withParameter(fnClass)
+ .intercept(new InvokerConstructor())
+ .method(ElementMatchers.named("invokeProcessElement"))
+ .intercept(new ProcessElementDelegation(signature.processElement()))
+ .method(ElementMatchers.named("invokeStartBundle"))
+ .intercept(delegateOrNoop(signature.startBundle()))
+ .method(ElementMatchers.named("invokeFinishBundle"))
+ .intercept(delegateOrNoop(signature.finishBundle()))
+ .method(ElementMatchers.named("invokeSetup"))
+ .intercept(delegateOrNoop(signature.setup()))
+ .method(ElementMatchers.named("invokeTeardown"))
+ .intercept(delegateOrNoop(signature.teardown()))
+ .method(ElementMatchers.named("invokeGetInitialRestriction"))
+ .intercept(delegateWithDowncastOrThrow(signature.getInitialRestriction()))
+ .method(ElementMatchers.named("invokeSplitRestriction"))
+ .intercept(splitRestrictionDelegation(signature))
+ .method(ElementMatchers.named("invokeGetRestrictionCoder"))
+ .intercept(getRestrictionCoderDelegation(signature))
+ .method(ElementMatchers.named("invokeNewTracker"))
+ .intercept(delegateWithDowncastOrThrow(signature.newTracker()));
+
+ DynamicType.Unloaded<?> unloaded = builder.make();
+
+ @SuppressWarnings("unchecked")
+ Class<? extends DoFnInvoker<?, ?>> res =
+ (Class<? extends DoFnInvoker<?, ?>>)
+ unloaded
+ .load(
+ ByteBuddyDoFnInvokerFactory.class.getClassLoader(),
+ ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded();
+ return res;
+ }
+
+ private static Implementation getRestrictionCoderDelegation(DoFnSignature signature) {
+ if (signature.processElement().isSplittable()) {
+ if (signature.getRestrictionCoder() == null) {
+ return MethodDelegation.to(
+ new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
+ } else {
+ return new DowncastingParametersMethodDelegation(
+ signature.getRestrictionCoder().targetMethod());
+ }
+ } else {
+ return ExceptionMethod.throwing(UnsupportedOperationException.class);
+ }
+ }
+
+ private static Implementation splitRestrictionDelegation(DoFnSignature signature) {
+ if (signature.splitRestriction() == null) {
+ return MethodDelegation.to(DefaultSplitRestriction.class);
+ } else {
+ return new DowncastingParametersMethodDelegation(signature.splitRestriction().targetMethod());
+ }
+ }
+
+ /** Delegates to the given method if available, or does nothing. */
+ private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) {
+ return (method == null)
+ ? FixedValue.originType()
+ : new DoFnMethodDelegation(method.targetMethod());
+ }
+
+ /** Delegates to the given method if available, or throws UnsupportedOperationException. */
+ private static Implementation delegateWithDowncastOrThrow(DoFnSignature.DoFnMethod method) {
+ return (method == null)
+ ? ExceptionMethod.throwing(UnsupportedOperationException.class)
+ : new DowncastingParametersMethodDelegation(method.targetMethod());
+ }
+
+ /**
+ * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a
+ * "target method" of the wrapped {@link DoFn}.
+ */
+ static class DoFnMethodDelegation implements Implementation {
+ /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */
+ protected final MethodDescription targetMethod;
+ /** Whether the target method returns non-void. */
+ private final boolean targetHasReturn;
+
+ private FieldDescription delegateField;
+
+ public DoFnMethodDelegation(Method targetMethod) {
+ this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod);
+ targetHasReturn = !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure());
+ }
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ // Remember the field description of the instrumented type.
+ delegateField =
+ instrumentedType
+ .getDeclaredFields()
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly();
+ // Delegating the method call doesn't require any changes to the instrumented type.
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ /**
+ * @param instrumentedMethod The {@link DoFnInvoker} method for which we're generating code.
+ */
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ // Figure out how many locals we'll need. This corresponds to "this", the parameters
+ // of the instrumented method, and an argument to hold the return value if the target
+ // method has a return value.
+ int numLocals = 1 + instrumentedMethod.getParameters().size() + (targetHasReturn ? 1 : 0);
+
+ Integer returnVarIndex = null;
+ if (targetHasReturn) {
+ // Local comes after formal parameters, so figure out where that is.
+ returnVarIndex = 1; // "this"
+ for (Type param : Type.getArgumentTypes(instrumentedMethod.getDescriptor())) {
+ returnVarIndex += param.getSize();
+ }
+ }
+
+ StackManipulation manipulation =
+ new StackManipulation.Compound(
+ // Push "this" (DoFnInvoker on top of the stack)
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Access this.delegate (DoFn on top of the stack)
+ FieldAccess.forField(delegateField).getter(),
+ // Run the beforeDelegation manipulations.
+ // The arguments necessary to invoke the target are on top of the stack.
+ beforeDelegation(instrumentedMethod),
+ // Perform the method delegation.
+ // This will consume the arguments on top of the stack
+ // Either the stack is now empty (because the targetMethod returns void) or the
+ // stack contains the return value.
+ new UserCodeMethodInvocation(returnVarIndex, targetMethod, instrumentedMethod),
+ // Run the afterDelegation manipulations.
+ // Either the stack is now empty (because the instrumentedMethod returns void)
+ // or the stack contains the return value.
+ afterDelegation(instrumentedMethod));
+
+ StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), numLocals);
+ }
+ };
+ }
+
+ /**
+ * Return the code to the prepare the operand stack for the method delegation.
+ *
+ * <p>Before this method is called, the stack delegate will be the only thing on the stack.
+ *
+ * <p>After this method is called, the stack contents should contain exactly the arguments
+ * necessary to invoke the target method.
+ */
+ protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+ return MethodVariableAccess.allArgumentsOf(targetMethod);
+ }
+
+ /**
+ * Return the code to execute after the method delegation.
+ *
+ * <p>Before this method is called, the stack will either be empty (if the target method returns
+ * void) or contain the method return value.
+ *
+ * <p>After this method is called, the stack should either be empty (if the instrumented method
+ * returns void) or contain the value for the instrumented method to return).
+ */
+ protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
+ return TargetMethodAnnotationDrivenBinder.TerminationHandler.Returning.INSTANCE.resolve(
+ Assigner.DEFAULT, instrumentedMethod, targetMethod);
+ }
+ }
+
+ /**
+ * Passes parameters to the delegated method by downcasting each parameter of non-primitive type
+ * to its expected type.
+ */
+ private static class DowncastingParametersMethodDelegation extends DoFnMethodDelegation {
+ DowncastingParametersMethodDelegation(Method method) {
+ super(method);
+ }
+
+ @Override
+ protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+ List<StackManipulation> pushParameters = new ArrayList<>();
+ TypeList.Generic paramTypes = targetMethod.getParameters().asTypeList();
+ for (int i = 0; i < paramTypes.size(); i++) {
+ TypeDescription.Generic paramT = paramTypes.get(i);
+ pushParameters.add(MethodVariableAccess.of(paramT).loadOffset(i + 1));
+ if (!paramT.isPrimitive()) {
+ pushParameters.add(TypeCasting.to(paramT));
+ }
+ }
+ return new StackManipulation.Compound(pushParameters);
+ }
+ }
+
+ /**
+ * This wrapper exists to convert checked exceptions to unchecked exceptions, since if this fails
+ * the library itself is malformed.
+ */
+ private static MethodDescription getExtraContextFactoryMethodDescription(
+ String methodName, Class<?>... parameterTypes) {
+ try {
+ return new MethodDescription.ForLoadedMethod(
+ ExtraContextFactory.class.getMethod(methodName, parameterTypes));
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format(
+ "Failed to locate required method %s.%s",
+ ExtraContextFactory.class.getSimpleName(), methodName),
+ e);
+ }
+ }
+
+ private static StackManipulation simpleExtraContextParameter(
+ String methodName,
+ StackManipulation pushExtraContextFactory) {
+ return new StackManipulation.Compound(
+ pushExtraContextFactory,
+ MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName)));
+ }
+
+ static StackManipulation getExtraContextParameter(
+ DoFnSignature.Parameter parameter,
+ final StackManipulation pushExtraContextFactory) {
+
+ return parameter.match(
+ new Cases<StackManipulation>() {
+
+ @Override
+ public StackManipulation dispatch(WindowParameter p) {
+ return new StackManipulation.Compound(
+ simpleExtraContextParameter("window", pushExtraContextFactory),
+ TypeCasting.to(new TypeDescription.ForLoadedType(p.windowT().getRawType())));
+ }
+
+ @Override
+ public StackManipulation dispatch(InputProviderParameter p) {
+ return simpleExtraContextParameter("inputProvider", pushExtraContextFactory);
+ }
+
+ @Override
+ public StackManipulation dispatch(OutputReceiverParameter p) {
+ return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory);
+ }
+
+ @Override
+ public StackManipulation dispatch(RestrictionTrackerParameter p) {
+ // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker,
+ // but the @ProcessElement method expects a concrete subtype of it.
+ // Insert a downcast.
+ return new StackManipulation.Compound(
+ simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory),
+ TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType())));
+ }
+
+ @Override
+ public StackManipulation dispatch(StateParameter p) {
+ return new StackManipulation.Compound(
+ // TOP = extraContextFactory.state(<id>)
+ pushExtraContextFactory,
+ new TextConstant(p.referent().id()),
+ MethodInvocation.invoke(
+ getExtraContextFactoryMethodDescription("state", String.class)),
+ TypeCasting.to(
+ new TypeDescription.ForLoadedType(p.referent().stateType().getRawType())));
+ }
+
+ @Override
+ public StackManipulation dispatch(TimerParameter p) {
+ return new StackManipulation.Compound(
+ // TOP = extraContextFactory.state(<id>)
+ pushExtraContextFactory,
+ new TextConstant(p.referent().id()),
+ MethodInvocation.invoke(
+ getExtraContextFactoryMethodDescription("timer", String.class)),
+ TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class)));
+ }
+ });
+ }
+
+ /**
+ * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
+ * {@link ProcessElement} method.
+ */
+ private static final class ProcessElementDelegation extends DoFnMethodDelegation {
+ private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
+
+ static {
+ try {
+ PROCESS_CONTINUATION_STOP_METHOD =
+ new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
+ }
+ }
+
+ private final DoFnSignature.ProcessElementMethod signature;
+
+ /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
+ private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) {
+ super(signature.targetMethod());
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+ // Parameters of the wrapper invoker method:
+ // DoFn.ProcessContext, ExtraContextFactory.
+ // Parameters of the wrapped DoFn method:
+ // DoFn.ProcessContext, [BoundedWindow, InputProvider, OutputReceiver] in any order
+ ArrayList<StackManipulation> pushParameters = new ArrayList<>();
+ // Push the ProcessContext argument.
+ pushParameters.add(MethodVariableAccess.REFERENCE.loadOffset(1));
+ // Push the extra arguments in their actual order.
+ StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(2);
+ for (DoFnSignature.Parameter param : signature.extraParameters()) {
+ pushParameters.add(getExtraContextParameter(param, pushExtraContextFactory));
+ }
+ return new StackManipulation.Compound(pushParameters);
+ }
+
+ @Override
+ protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
+ if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
+ return new StackManipulation.Compound(
+ MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
+ } else {
+ return MethodReturn.returning(targetMethod.getReturnType().asErasure());
+ }
+ }
+ }
+
+ private static class UserCodeMethodInvocation implements StackManipulation {
+
+ @Nullable private final Integer returnVarIndex;
+ private final MethodDescription targetMethod;
+ private final MethodDescription instrumentedMethod;
+ private final TypeDescription returnType;
+
+ private final Label wrapStart = new Label();
+ private final Label wrapEnd = new Label();
+ private final Label tryBlockStart = new Label();
+ private final Label tryBlockEnd = new Label();
+ private final Label catchBlockStart = new Label();
+ private final Label catchBlockEnd = new Label();
+
+ private final MethodDescription createUserCodeException;
+
+ UserCodeMethodInvocation(
+ @Nullable Integer returnVarIndex,
+ MethodDescription targetMethod,
+ MethodDescription instrumentedMethod) {
+ this.returnVarIndex = returnVarIndex;
+ this.targetMethod = targetMethod;
+ this.instrumentedMethod = instrumentedMethod;
+ this.returnType = targetMethod.getReturnType().asErasure();
+
+ boolean targetMethodReturnsVoid = TypeDescription.VOID.equals(returnType);
+ checkArgument(
+ (returnVarIndex == null) == targetMethodReturnsVoid,
+ "returnVarIndex should be defined if and only if the target method has a return value");
+
+ try {
+ createUserCodeException =
+ new MethodDescription.ForLoadedMethod(
+ UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException("Unable to find UserCodeException.wrap", e);
+ }
+ }
+
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ private Object describeType(Type type) {
+ switch (type.getSort()) {
+ case Type.OBJECT:
+ return type.getInternalName();
+ case Type.INT:
+ case Type.BYTE:
+ case Type.BOOLEAN:
+ case Type.SHORT:
+ return Opcodes.INTEGER;
+ case Type.LONG:
+ return Opcodes.LONG;
+ case Type.DOUBLE:
+ return Opcodes.DOUBLE;
+ case Type.FLOAT:
+ return Opcodes.FLOAT;
+ default:
+ throw new IllegalArgumentException("Unhandled type as method argument: " + type);
+ }
+ }
+
+ private void visitFrame(
+ MethodVisitor mv, boolean localsIncludeReturn, @Nullable String stackTop) {
+ boolean hasReturnLocal = (returnVarIndex != null) && localsIncludeReturn;
+
+ Type[] localTypes = Type.getArgumentTypes(instrumentedMethod.getDescriptor());
+ Object[] locals = new Object[1 + localTypes.length + (hasReturnLocal ? 1 : 0)];
+ locals[0] = instrumentedMethod.getReceiverType().asErasure().getInternalName();
+ for (int i = 0; i < localTypes.length; i++) {
+ locals[i + 1] = describeType(localTypes[i]);
+ }
+ if (hasReturnLocal) {
+ locals[locals.length - 1] = returnType.getInternalName();
+ }
+
+ Object[] stack = stackTop == null ? new Object[] {} : new Object[] {stackTop};
+
+ mv.visitFrame(Opcodes.F_NEW, locals.length, locals, stack.length, stack);
+ }
+
+ @Override
+ public Size apply(MethodVisitor mv, Context context) {
+ Size size = new Size(0, 0);
+
+ mv.visitLabel(wrapStart);
+
+ String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
+ mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
+
+ // The try block attempts to perform the expected operations, then jumps to success
+ mv.visitLabel(tryBlockStart);
+ size = size.aggregate(MethodInvocation.invoke(targetMethod).apply(mv, context));
+
+ if (returnVarIndex != null) {
+ mv.visitVarInsn(Opcodes.ASTORE, returnVarIndex);
+ size = size.aggregate(new Size(-1, 0)); // Reduces the size of the stack
+ }
+ mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
+ mv.visitLabel(tryBlockEnd);
+
+ // The handler wraps the exception, and then throws.
+ mv.visitLabel(catchBlockStart);
+ // In catch block, should have same locals and {Throwable} on the stack.
+ visitFrame(mv, false, throwableName);
+
+ // Create the user code exception and throw
+ size =
+ size.aggregate(
+ new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
+ .apply(mv, context));
+
+ mv.visitLabel(catchBlockEnd);
+
+ // After the catch block we should have the return in scope, but nothing on the stack.
+ visitFrame(mv, true, null);
+
+ // After catch block, should have same locals and will have the return on the stack.
+ if (returnVarIndex != null) {
+ mv.visitVarInsn(Opcodes.ALOAD, returnVarIndex);
+ size = size.aggregate(new Size(1, 0)); // Increases the size of the stack
+ }
+ mv.visitLabel(wrapEnd);
+ if (returnVarIndex != null) {
+ // Drop the return type from the locals
+ mv.visitLocalVariable(
+ "res",
+ returnType.getDescriptor(),
+ returnType.getGenericSignature(),
+ wrapStart,
+ wrapEnd,
+ returnVarIndex);
+ }
+
+ return size;
+ }
+ }
+
+ /**
+ * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
+ * for a constructor that takes a single argument and assigns it to the delegate field.
+ */
+ private static final class InvokerConstructor implements Implementation {
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation.Size size =
+ new StackManipulation.Compound(
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Invoke the super constructor (default constructor of Object)
+ MethodInvocation.invoke(
+ new TypeDescription.ForLoadedType(Object.class)
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.isConstructor()
+ .and(ElementMatchers.takesArguments(0)))
+ .getOnly()),
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Load the delegate argument
+ MethodVariableAccess.REFERENCE.loadOffset(1),
+ // Assign the delegate argument to the delegate field
+ FieldAccess.forField(
+ implementationTarget
+ .getInstrumentedType()
+ .getDeclaredFields()
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly())
+ .putter(),
+ // Return void.
+ MethodReturn.VOID)
+ .apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
new file mode 100644
index 0000000..5300a86
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.NamingStrategy;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.modifier.FieldManifestation;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.jar.asm.MethodVisitor;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
+import org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory.DoFnMethodDelegation;
+
+/**
+ * Dynamically generates {@link OnTimerInvoker} instances for invoking a particular {@link TimerId}
+ * on a particular {@link DoFn}.
+ */
+class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
+
+ @Override
+ public <InputT, OutputT> OnTimerInvoker<InputT, OutputT> forTimer(
+ DoFn<InputT, OutputT> fn, String timerId) {
+
+ @SuppressWarnings("unchecked")
+ Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass();
+
+ try {
+ Constructor<?> constructor = constructorCache.get(fnClass).get(timerId);
+ @SuppressWarnings("unchecked")
+ OnTimerInvoker<InputT, OutputT> invoker =
+ (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn);
+ return invoker;
+ } catch (InstantiationException
+ | IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | SecurityException
+ | ExecutionException e) {
+ throw new RuntimeException(
+ String.format(
+ "Unable to construct @%s invoker for %s",
+ OnTimer.class.getSimpleName(), fn.getClass().getName()),
+ e);
+ }
+ }
+
+ public static ByteBuddyOnTimerInvokerFactory only() {
+ return INSTANCE;
+ }
+
+ private static final ByteBuddyOnTimerInvokerFactory INSTANCE =
+ new ByteBuddyOnTimerInvokerFactory();
+
+ private ByteBuddyOnTimerInvokerFactory() {}
+
+ /**
+ * The field name for the delegate of {@link DoFn} subclass that a bytebuddy invoker will call.
+ */
+ private static final String FN_DELEGATE_FIELD_NAME = "delegate";
+
+ /**
+ * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn}
+ * class and then by {@link TimerId}.
+ *
+ * <p>Needed because generating an invoker class is expensive, and to avoid generating an
+ * excessive number of classes consuming PermGen memory in Java's that still have PermGen.
+ */
+ private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>
+ constructorCache =
+ CacheBuilder.newBuilder()
+ .build(
+ new CacheLoader<
+ Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() {
+ @Override
+ public LoadingCache<String, Constructor<?>> load(
+ final Class<? extends DoFn<?, ?>> fnClass) throws Exception {
+ return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass));
+ }
+ });
+
+ /**
+ * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the
+ * invokers for its {@link OnTimer @OnTimer} methods.
+ */
+ private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> {
+
+ private final DoFnSignature signature;
+
+ public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) {
+ this.signature = DoFnSignatures.getSignature(clazz);
+ }
+
+ @Override
+ public Constructor<?> load(String timerId) throws Exception {
+ Class<? extends OnTimerInvoker<?, ?>> invokerClass =
+ generateOnTimerInvokerClass(signature, timerId);
+ try {
+ return invokerClass.getConstructor(signature.fnClass());
+ } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link
+ * TimerId}.
+ */
+ private static Class<? extends OnTimerInvoker<?, ?>> generateOnTimerInvokerClass(
+ DoFnSignature signature, String timerId) {
+ Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
+
+ final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);
+
+ final String className =
+ "auxiliary_OnTimer_" + CharMatcher.JAVA_LETTER_OR_DIGIT.retainFrom(timerId);
+
+ DynamicType.Builder<?> builder =
+ new ByteBuddy()
+ // Create subclasses inside the target class, to have access to
+ // private and package-private bits
+ .with(
+ new NamingStrategy.SuffixingRandom(className) {
+ @Override
+ public String subclass(TypeDescription.Generic superClass) {
+ return super.name(clazzDescription);
+ }
+ })
+ // class <invoker class> implements OnTimerInvoker {
+ .subclass(OnTimerInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+
+ // private final <fn class> delegate;
+ .defineField(
+ FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+
+ // <invoker class>(<fn class> delegate) { this.delegate = delegate; }
+ .defineConstructor(Visibility.PUBLIC)
+ .withParameter(fnClass)
+ .intercept(new InvokerConstructor())
+
+ // public invokeOnTimer(ExtraContextFactory) {
+ // this.delegate.<@OnTimer method>(... pass the right args ...)
+ // }
+ .method(ElementMatchers.named("invokeOnTimer"))
+ .intercept(new InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId)));
+
+ DynamicType.Unloaded<?> unloaded = builder.make();
+
+ @SuppressWarnings("unchecked")
+ Class<? extends OnTimerInvoker<?, ?>> res =
+ (Class<? extends OnTimerInvoker<?, ?>>)
+ unloaded
+ .load(
+ ByteBuddyOnTimerInvokerFactory.class.getClassLoader(),
+ ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded();
+ return res;
+ }
+
+ /**
+ * An "invokeOnTimer" method implementation akin to @ProcessElement, but simpler because no
+ * splitting-related parameters need to be handled.
+ */
+ private static class InvokeOnTimerDelegation extends DoFnMethodDelegation {
+
+ private final DoFnSignature.OnTimerMethod signature;
+
+ public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) {
+ super(signature.targetMethod());
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+ // Parameters of the wrapper invoker method:
+ // ExtraContextFactory.
+ // Parameters of the wrapped DoFn method:
+ // a dynamic set of allowed "extra" parameters in any order subject to
+ // validation prior to getting the DoFnSignature
+ ArrayList<StackManipulation> parameters = new ArrayList<>();
+ // Push the extra arguments in their actual order.
+ StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(1);
+ for (DoFnSignature.Parameter param : signature.extraParameters()) {
+ parameters.add(
+ ByteBuddyDoFnInvokerFactory.getExtraContextParameter(param, pushExtraContextFactory));
+ }
+ return new StackManipulation.Compound(parameters);
+ }
+ }
+
+ /**
+ * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
+ * for a constructor that takes a single argument and assigns it to the delegate field.
+ */
+ private static final class InvokerConstructor implements Implementation {
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation.Size size =
+ new StackManipulation.Compound(
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Invoke the super constructor (default constructor of Object)
+ MethodInvocation.invoke(
+ new TypeDescription.ForLoadedType(Object.class)
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.isConstructor()
+ .and(ElementMatchers.takesArguments(0)))
+ .getOnly()),
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Load the delegate argument
+ MethodVariableAccess.REFERENCE.loadOffset(1),
+ // Assign the delegate argument to the delegate field
+ FieldAccess.forField(
+ implementationTarget
+ .getInstrumentedType()
+ .getDeclaredFields()
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly())
+ .putter(),
+ // Return void.
+ MethodReturn.VOID)
+ .apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokerFactory.java
new file mode 100644
index 0000000..a54f2bc
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+/** A factory for providing a {@link DoFnInvoker} for invoking a {@link DoFn}. */
+interface DoFnInvokerFactory {
+
+ /** Creates a {@link DoFnInvoker} for the given {@link DoFn}. */
+ <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, OutputT> doFn);
+}