You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/08 02:42:43 UTC
[2/2] incubator-beam git commit: Add OnTimerInvoker(s),
for invoking DoFn @OnTimer methods
Add OnTimerInvoker(s), for invoking DoFn @OnTimer methods
OnTimerInvoker encapsulates the dispatch from onTimer(<timer ID>)
to a call to the DoFn method annotated with @OnTimer(<timer ID>).
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42f5251d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42f5251d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42f5251d
Branch: refs/heads/master
Commit: 42f5251dab3874471fc6a4c5ad813932b65ff703
Parents: 99505e1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 31 19:26:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 7 18:30:18 2016 -0800
----------------------------------------------------------------------
.../sdk/transforms/reflect/DoFnInvokers.java | 4 +-
.../sdk/transforms/reflect/OnTimerInvoker.java | 27 ++
.../sdk/transforms/reflect/OnTimerInvokers.java | 271 +++++++++++++++++++
.../transforms/reflect/OnTimerInvokersTest.java | 109 ++++++++
4 files changed, 409 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index b975711..086ae7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -377,7 +377,7 @@ public class DoFnInvokers {
* Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a
* "target method" of the wrapped {@link DoFn}.
*/
- private static class DoFnMethodDelegation implements Implementation {
+ static class DoFnMethodDelegation implements Implementation {
/** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */
protected final MethodDescription targetMethod;
/** Whether the target method returns non-void. */
@@ -529,7 +529,7 @@ public class DoFnInvokers {
MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName)));
}
- private static StackManipulation getExtraContextParameter(
+ static StackManipulation getExtraContextParameter(
DoFnSignature.Parameter parameter,
final StackManipulation pushExtraContextFactory) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
new file mode 100644
index 0000000..de9d667
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */
+interface OnTimerInvoker<InputT, OutputT> {
+
+ /** Invoke the {@link DoFn.OnTimer} method in the provided context. */
+ void invokeOnTimer(DoFn.ExtraContextFactory<InputT, OutputT> extra);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
new file mode 100644
index 0000000..b2bace2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.NamingStrategy;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.modifier.FieldManifestation;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.jar.asm.MethodVisitor;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers.DoFnMethodDelegation;
+
+/**
+ * Dynamically generates {@link OnTimerInvoker} instances for invoking a particular {@link TimerId}
+ * on a particular {@link DoFn}.
+ */
+class OnTimerInvokers {
+ public static final OnTimerInvokers INSTANCE = new OnTimerInvokers();
+
+ private OnTimerInvokers() {}
+
+ /**
+ * The field name for the delegate of {@link DoFn} subclass that a bytebuddy invoker will call.
+ */
+ private static final String FN_DELEGATE_FIELD_NAME = "delegate";
+
+ /**
+ * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn}
+ * class and then by {@link TimerId}.
+ *
+ * <p>Needed because generating an invoker class is expensive, and to avoid generating an
+ * excessive number of classes consuming PermGen memory in Java's that still have PermGen.
+ */
+ private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>
+ constructorCache =
+ CacheBuilder.newBuilder()
+ .build(
+ new CacheLoader<
+ Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() {
+ @Override
+ public LoadingCache<String, Constructor<?>> load(
+ final Class<? extends DoFn<?, ?>> fnClass) throws Exception {
+ return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass));
+ }
+ });
+
+ /** Creates invoker. */
+ public <InputT, OutputT> OnTimerInvoker<InputT, OutputT> forTimer(
+ DoFn<InputT, OutputT> fn, String timerId) {
+
+ @SuppressWarnings("unchecked")
+ Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass();
+
+ try {
+ Constructor<?> constructor = constructorCache.get(fnClass).get(timerId);
+ @SuppressWarnings("unchecked")
+ OnTimerInvoker<InputT, OutputT> invoker =
+ (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn);
+ return invoker;
+ } catch (InstantiationException
+ | IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | SecurityException
+ | ExecutionException e) {
+ throw new RuntimeException(
+ String.format(
+ "Unable to construct @%s invoker for %s",
+ DoFn.OnTimer.class.getSimpleName(), fn.getClass().getName()),
+ e);
+ }
+ }
+
+ /**
+ * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the
+ * invokers for its {@link OnTimer @OnTimer} methods.
+ */
+ private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> {
+
+ private final DoFnSignature signature;
+
+ public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) {
+ this.signature = DoFnSignatures.INSTANCE.getSignature(clazz);
+ }
+
+ @Override
+ public Constructor<?> load(String timerId) throws Exception {
+ Class<? extends OnTimerInvoker<?, ?>> invokerClass =
+ generateOnTimerInvokerClass(signature, timerId);
+ try {
+ return invokerClass.getConstructor(signature.fnClass());
+ } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link
+ * TimerId}.
+ */
+ private static Class<? extends OnTimerInvoker<?, ?>> generateOnTimerInvokerClass(
+ DoFnSignature signature, String timerId) {
+ Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
+
+ final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);
+
+ final String className =
+ "auxiliary_OnTimer_" + CharMatcher.JAVA_LETTER_OR_DIGIT.retainFrom(timerId);
+
+ DynamicType.Builder<?> builder =
+ new ByteBuddy()
+ // Create subclasses inside the target class, to have access to
+ // private and package-private bits
+ .with(
+ new NamingStrategy.SuffixingRandom(className) {
+ @Override
+ public String subclass(TypeDescription.Generic superClass) {
+ return super.name(clazzDescription);
+ }
+ })
+ // class <invoker class> implements OnTimerInvoker {
+ .subclass(OnTimerInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+
+ // private final <fn class> delegate;
+ .defineField(
+ FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+
+ // <invoker class>(<fn class> delegate) { this.delegate = delegate; }
+ .defineConstructor(Visibility.PUBLIC)
+ .withParameter(fnClass)
+ .intercept(new InvokerConstructor())
+
+ // public invokeOnTimer(ExtraContextFactory) {
+ // this.delegate.<@OnTimer method>(... pass the right args ...)
+ // }
+ .method(ElementMatchers.named("invokeOnTimer"))
+ .intercept(new InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId)));
+
+ DynamicType.Unloaded<?> unloaded = builder.make();
+
+ @SuppressWarnings("unchecked")
+ Class<? extends OnTimerInvoker<?, ?>> res =
+ (Class<? extends OnTimerInvoker<?, ?>>)
+ unloaded
+ .load(
+ OnTimerInvokers.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded();
+ return res;
+ }
+
+ /**
+ * An "invokeOnTimer" method implementation akin to @ProcessElement, but simpler because no
+ * splitting-related parameters need to be handled.
+ */
+ private static class InvokeOnTimerDelegation extends DoFnMethodDelegation {
+
+ private final DoFnSignature.OnTimerMethod signature;
+
+ public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) {
+ super(signature.targetMethod());
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+ // Parameters of the wrapper invoker method:
+ // ExtraContextFactory.
+ // Parameters of the wrapped DoFn method:
+ // a dynamic set of allowed "extra" parameters in any order subject to
+ // validation prior to getting the DoFnSignature
+ ArrayList<StackManipulation> parameters = new ArrayList<>();
+ // Push the extra arguments in their actual order.
+ StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(1);
+ for (DoFnSignature.Parameter param : signature.extraParameters()) {
+ parameters.add(DoFnInvokers.getExtraContextParameter(param, pushExtraContextFactory));
+ }
+ return new StackManipulation.Compound(parameters);
+ }
+ }
+
+ /**
+ * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
+ * for a constructor that takes a single argument and assigns it to the delegate field.
+ */
+ private static final class InvokerConstructor implements Implementation {
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation.Size size =
+ new StackManipulation.Compound(
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Invoke the super constructor (default constructor of Object)
+ MethodInvocation.invoke(
+ new TypeDescription.ForLoadedType(Object.class)
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.isConstructor()
+ .and(ElementMatchers.takesArguments(0)))
+ .getOnly()),
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Load the delegate argument
+ MethodVariableAccess.REFERENCE.loadOffset(1),
+ // Assign the delegate argument to the delegate field
+ FieldAccess.forField(
+ implementationTarget
+ .getInstrumentedType()
+ .getDeclaredFields()
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly())
+ .putter(),
+ // Return void.
+ MethodReturn.VOID)
+ .apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
new file mode 100644
index 0000000..f8275fa
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DoFnInvokers}. */
+@RunWith(JUnit4.class)
+public class OnTimerInvokersTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Mock private BoundedWindow mockWindow;
+
+ @Mock private ExtraContextFactory<String, String> mockExtraContextFactory;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ when(mockExtraContextFactory.window()).thenReturn(mockWindow);
+ }
+
+ private void invokeOnTimer(DoFn<String, String> fn, String timerId) {
+ OnTimerInvokers.INSTANCE.forTimer(fn, timerId).invokeOnTimer(mockExtraContextFactory);
+ }
+
+ @Test
+ public void testOnTimerHelloWord() throws Exception {
+ final String timerId = "my-timer-id";
+
+ class SimpleTimerDoFn extends DoFn<String, String> {
+
+ public String status = "not yet";
+
+ @TimerId(timerId)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnTimer(timerId)
+ public void onMyTimer() {
+ status = "OK now";
+ }
+ }
+
+ SimpleTimerDoFn fn = new SimpleTimerDoFn();
+
+ invokeOnTimer(fn, timerId);
+ assertThat(fn.status, equalTo("OK now"));
+ }
+
+ @Test
+ public void testOnTimerWithWindow() throws Exception {
+ WindowedTimerDoFn fn = new WindowedTimerDoFn();
+
+ invokeOnTimer(fn, WindowedTimerDoFn.TIMER_ID);
+ assertThat(fn.window, theInstance(mockWindow));
+ }
+
+ private static class WindowedTimerDoFn extends DoFn<String, String> {
+ public static final String TIMER_ID = "my-timer-id";
+
+ public BoundedWindow window = null;
+
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnTimer(TIMER_ID)
+ public void onMyTimer(BoundedWindow window) {
+ this.window = window;
+ }
+ }
+}