You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/08 02:42:43 UTC

[2/2] incubator-beam git commit: Add OnTimerInvoker(s), for invoking DoFn @OnTimer methods

Add OnTimerInvoker(s), for invoking DoFn @OnTimer methods

OnTimerInvoker encapsulates the dispatch from onTimer(<timer ID>)
to a call to the DoFn method annotated with @OnTimer(<timer ID>).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42f5251d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42f5251d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42f5251d

Branch: refs/heads/master
Commit: 42f5251dab3874471fc6a4c5ad813932b65ff703
Parents: 99505e1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 31 19:26:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 7 18:30:18 2016 -0800

----------------------------------------------------------------------
 .../sdk/transforms/reflect/DoFnInvokers.java    |   4 +-
 .../sdk/transforms/reflect/OnTimerInvoker.java  |  27 ++
 .../sdk/transforms/reflect/OnTimerInvokers.java | 271 +++++++++++++++++++
 .../transforms/reflect/OnTimerInvokersTest.java | 109 ++++++++
 4 files changed, 409 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index b975711..086ae7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -377,7 +377,7 @@ public class DoFnInvokers {
    * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a
    * "target method" of the wrapped {@link DoFn}.
    */
-  private static class DoFnMethodDelegation implements Implementation {
+  static class DoFnMethodDelegation implements Implementation {
     /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */
     protected final MethodDescription targetMethod;
     /** Whether the target method returns non-void. */
@@ -529,7 +529,7 @@ public class DoFnInvokers {
         MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName)));
   }
 
-  private static StackManipulation getExtraContextParameter(
+  static StackManipulation getExtraContextParameter(
       DoFnSignature.Parameter parameter,
       final StackManipulation pushExtraContextFactory) {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
new file mode 100644
index 0000000..de9d667
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */
+interface OnTimerInvoker<InputT, OutputT> {
+
+  /** Invoke the {@link DoFn.OnTimer} method in the provided context. */
+  void invokeOnTimer(DoFn.ExtraContextFactory<InputT, OutputT> extra);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
new file mode 100644
index 0000000..b2bace2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.NamingStrategy;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.modifier.FieldManifestation;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.jar.asm.MethodVisitor;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers.DoFnMethodDelegation;
+
+/**
+ * Dynamically generates {@link OnTimerInvoker} instances for invoking a particular {@link TimerId}
+ * on a particular {@link DoFn}.
+ */
+class OnTimerInvokers {
+  public static final OnTimerInvokers INSTANCE = new OnTimerInvokers();
+
+  private OnTimerInvokers() {}
+
+  /**
+   * The field name for the delegate of {@link DoFn} subclass that a bytebuddy invoker will call.
+   */
+  private static final String FN_DELEGATE_FIELD_NAME = "delegate";
+
+  /**
+   * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn}
+   * class and then by {@link TimerId}.
+   *
+   * <p>Needed because generating an invoker class is expensive, and to avoid generating an
+   * excessive number of classes consuming PermGen memory in Java's that still have PermGen.
+   */
+  private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>
+      constructorCache =
+          CacheBuilder.newBuilder()
+              .build(
+                  new CacheLoader<
+                      Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() {
+                    @Override
+                    public LoadingCache<String, Constructor<?>> load(
+                        final Class<? extends DoFn<?, ?>> fnClass) throws Exception {
+                      return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass));
+                    }
+                  });
+
+  /** Creates invoker. */
+  public <InputT, OutputT> OnTimerInvoker<InputT, OutputT> forTimer(
+      DoFn<InputT, OutputT> fn, String timerId) {
+
+    @SuppressWarnings("unchecked")
+    Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass();
+
+    try {
+      Constructor<?> constructor = constructorCache.get(fnClass).get(timerId);
+      @SuppressWarnings("unchecked")
+      OnTimerInvoker<InputT, OutputT> invoker =
+          (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn);
+      return invoker;
+    } catch (InstantiationException
+        | IllegalAccessException
+        | IllegalArgumentException
+        | InvocationTargetException
+        | SecurityException
+        | ExecutionException e) {
+      throw new RuntimeException(
+          String.format(
+              "Unable to construct @%s invoker for %s",
+              DoFn.OnTimer.class.getSimpleName(), fn.getClass().getName()),
+          e);
+    }
+  }
+
+  /**
+   * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the
+   * invokers for its {@link OnTimer @OnTimer} methods.
+   */
+  private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> {
+
+    private final DoFnSignature signature;
+
+    public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) {
+      this.signature = DoFnSignatures.INSTANCE.getSignature(clazz);
+    }
+
+    @Override
+    public Constructor<?> load(String timerId) throws Exception {
+      Class<? extends OnTimerInvoker<?, ?>> invokerClass =
+          generateOnTimerInvokerClass(signature, timerId);
+      try {
+        return invokerClass.getConstructor(signature.fnClass());
+      } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link
+   * TimerId}.
+   */
+  private static Class<? extends OnTimerInvoker<?, ?>> generateOnTimerInvokerClass(
+      DoFnSignature signature, String timerId) {
+    Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
+
+    final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);
+
+    final String className =
+        "auxiliary_OnTimer_" + CharMatcher.JAVA_LETTER_OR_DIGIT.retainFrom(timerId);
+
+    DynamicType.Builder<?> builder =
+        new ByteBuddy()
+            // Create subclasses inside the target class, to have access to
+            // private and package-private bits
+            .with(
+                new NamingStrategy.SuffixingRandom(className) {
+                  @Override
+                  public String subclass(TypeDescription.Generic superClass) {
+                    return super.name(clazzDescription);
+                  }
+                })
+            // class <invoker class> implements OnTimerInvoker {
+            .subclass(OnTimerInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+
+            //   private final <fn class> delegate;
+            .defineField(
+                FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+
+            //   <invoker class>(<fn class> delegate) { this.delegate = delegate; }
+            .defineConstructor(Visibility.PUBLIC)
+            .withParameter(fnClass)
+            .intercept(new InvokerConstructor())
+
+            //   public invokeOnTimer(ExtraContextFactory) {
+            //     this.delegate.<@OnTimer method>(... pass the right args ...)
+            //   }
+            .method(ElementMatchers.named("invokeOnTimer"))
+            .intercept(new InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId)));
+
+    DynamicType.Unloaded<?> unloaded = builder.make();
+
+    @SuppressWarnings("unchecked")
+    Class<? extends OnTimerInvoker<?, ?>> res =
+        (Class<? extends OnTimerInvoker<?, ?>>)
+            unloaded
+                .load(
+                    OnTimerInvokers.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+                .getLoaded();
+    return res;
+  }
+
+  /**
+   * An "invokeOnTimer" method implementation akin to @ProcessElement, but simpler because no
+   * splitting-related parameters need to be handled.
+   */
+  private static class InvokeOnTimerDelegation extends DoFnMethodDelegation {
+
+    private final DoFnSignature.OnTimerMethod signature;
+
+    public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) {
+      super(signature.targetMethod());
+      this.signature = signature;
+    }
+
+    @Override
+    protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+      // Parameters of the wrapper invoker method:
+      //   ExtraContextFactory.
+      // Parameters of the wrapped DoFn method:
+      //   a dynamic set of allowed "extra" parameters in any order subject to
+      //   validation prior to getting the DoFnSignature
+      ArrayList<StackManipulation> parameters = new ArrayList<>();
+      // Push the extra arguments in their actual order.
+      StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(1);
+      for (DoFnSignature.Parameter param : signature.extraParameters()) {
+        parameters.add(DoFnInvokers.getExtraContextParameter(param, pushExtraContextFactory));
+      }
+      return new StackManipulation.Compound(parameters);
+    }
+  }
+
+  /**
+   * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
+   * for a constructor that takes a single argument and assigns it to the delegate field.
+   */
+  private static final class InvokerConstructor implements Implementation {
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return new ByteCodeAppender() {
+        @Override
+        public Size apply(
+            MethodVisitor methodVisitor,
+            Context implementationContext,
+            MethodDescription instrumentedMethod) {
+          StackManipulation.Size size =
+              new StackManipulation.Compound(
+                      // Load the this reference
+                      MethodVariableAccess.REFERENCE.loadOffset(0),
+                      // Invoke the super constructor (default constructor of Object)
+                      MethodInvocation.invoke(
+                          new TypeDescription.ForLoadedType(Object.class)
+                              .getDeclaredMethods()
+                              .filter(
+                                  ElementMatchers.isConstructor()
+                                      .and(ElementMatchers.takesArguments(0)))
+                              .getOnly()),
+                      // Load the this reference
+                      MethodVariableAccess.REFERENCE.loadOffset(0),
+                      // Load the delegate argument
+                      MethodVariableAccess.REFERENCE.loadOffset(1),
+                      // Assign the delegate argument to the delegate field
+                      FieldAccess.forField(
+                              implementationTarget
+                                  .getInstrumentedType()
+                                  .getDeclaredFields()
+                                  .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+                                  .getOnly())
+                          .putter(),
+                      // Return void.
+                      MethodReturn.VOID)
+                  .apply(methodVisitor, implementationContext);
+          return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
new file mode 100644
index 0000000..f8275fa
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DoFnInvokers}. */
+@RunWith(JUnit4.class)
+public class OnTimerInvokersTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private BoundedWindow mockWindow;
+
+  @Mock private ExtraContextFactory<String, String> mockExtraContextFactory;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    when(mockExtraContextFactory.window()).thenReturn(mockWindow);
+  }
+
+  private void invokeOnTimer(DoFn<String, String> fn, String timerId) {
+    OnTimerInvokers.INSTANCE.forTimer(fn, timerId).invokeOnTimer(mockExtraContextFactory);
+  }
+
+  @Test
+  public void testOnTimerHelloWord() throws Exception {
+    final String timerId = "my-timer-id";
+
+    class SimpleTimerDoFn extends DoFn<String, String> {
+
+      public String status = "not yet";
+
+      @TimerId(timerId)
+      private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      @ProcessElement
+      public void process(ProcessContext c) {}
+
+      @OnTimer(timerId)
+      public void onMyTimer() {
+        status = "OK now";
+      }
+    }
+
+    SimpleTimerDoFn fn = new SimpleTimerDoFn();
+
+    invokeOnTimer(fn, timerId);
+    assertThat(fn.status, equalTo("OK now"));
+  }
+
+  @Test
+  public void testOnTimerWithWindow() throws Exception {
+    WindowedTimerDoFn fn = new WindowedTimerDoFn();
+
+    invokeOnTimer(fn, WindowedTimerDoFn.TIMER_ID);
+    assertThat(fn.window, theInstance(mockWindow));
+  }
+
+  private static class WindowedTimerDoFn extends DoFn<String, String> {
+    public static final String TIMER_ID = "my-timer-id";
+
+    public BoundedWindow window = null;
+
+    @TimerId(TIMER_ID)
+    private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @ProcessElement
+    public void process(ProcessContext c) {}
+
+    @OnTimer(TIMER_ID)
+    public void onMyTimer(BoundedWindow window) {
+      this.window = window;
+    }
+  }
+}