You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/10 04:50:06 UTC

[36/43] beam git commit: Simplified ByteBuddyOnTimerInvokerFactory

Simplified ByteBuddyOnTimerInvokerFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8d98336
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8d98336
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8d98336

Branch: refs/heads/gearpump-runner
Commit: c8d983363efd3f3d93825ecc8e8abae2dfa4e008
Parents: 17bc3b1
Author: Innocent Djiofack <dj...@gmail.com>
Authored: Wed Jun 28 22:15:11 2017 -0400
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 21:46:53 2017 -0700

----------------------------------------------------------------------
 .../reflect/ByteBuddyOnTimerInvokerFactory.java | 73 ++++++++------------
 .../reflect/OnTimerMethodSpecifier.java         | 37 ++++++++++
 2 files changed, 65 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c8d98336/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index e031337..5e31f2e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+
 import com.google.common.base.CharMatcher;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -61,13 +62,14 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
 
     @SuppressWarnings("unchecked")
     Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass();
-
     try {
-      Constructor<?> constructor = constructorCache.get(fnClass).get(timerId);
-      @SuppressWarnings("unchecked")
-      OnTimerInvoker<InputT, OutputT> invoker =
+        OnTimerMethodSpecifier onTimerMethodSpecifier =
+                OnTimerMethodSpecifier.forClassAndTimerId(fnClass, timerId);
+        Constructor<?> constructor = constructorCache.get(onTimerMethodSpecifier);
+
+        OnTimerInvoker<InputT, OutputT> invoker =
           (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn);
-      return invoker;
+        return invoker;
     } catch (InstantiationException
         | IllegalAccessException
         | IllegalArgumentException
@@ -97,50 +99,31 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
   private static final String FN_DELEGATE_FIELD_NAME = "delegate";
 
   /**
-   * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn}
-   * class and then by {@link TimerId}.
+   * A cache of constructors of generated {@link OnTimerInvoker} classes,
+   * keyed by {@link OnTimerMethodSpecifier}.
    *
    * <p>Needed because generating an invoker class is expensive, and to avoid generating an
    * excessive number of classes consuming PermGen memory in Java's that still have PermGen.
    */
-  private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>
-      constructorCache =
-          CacheBuilder.newBuilder()
-              .build(
-                  new CacheLoader<
-                      Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() {
-                    @Override
-                    public LoadingCache<String, Constructor<?>> load(
-                        final Class<? extends DoFn<?, ?>> fnClass) throws Exception {
-                      return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass));
-                    }
-                  });
-
-  /**
-   * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the
-   * invokers for its {@link OnTimer @OnTimer} methods.
-   */
-  private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> {
-
-    private final DoFnSignature signature;
-
-    public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) {
-      this.signature = DoFnSignatures.getSignature(clazz);
-    }
-
-    @Override
-    public Constructor<?> load(String timerId) throws Exception {
-      Class<? extends OnTimerInvoker<?, ?>> invokerClass =
-          generateOnTimerInvokerClass(signature, timerId);
-      try {
-        return invokerClass.getConstructor(signature.fnClass());
-      } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  /**
+  private final LoadingCache<OnTimerMethodSpecifier, Constructor<?>> constructorCache =
+          CacheBuilder.newBuilder().build(
+          new CacheLoader<OnTimerMethodSpecifier, Constructor<?>>() {
+              @Override
+              public Constructor<?> load(final OnTimerMethodSpecifier onTimerMethodSpecifier)
+                      throws Exception {
+                  DoFnSignature signature =
+                          DoFnSignatures.getSignature(onTimerMethodSpecifier.fnClass());
+                  Class<? extends OnTimerInvoker<?, ?>> invokerClass =
+                          generateOnTimerInvokerClass(signature, onTimerMethodSpecifier.timerId());
+                  try {
+                      return invokerClass.getConstructor(signature.fnClass());
+                  } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+                      throw new RuntimeException(e);
+                  }
+
+              }
+          });
+    /**
    * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link
    * TimerId}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/c8d98336/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
new file mode 100644
index 0000000..edf7e3c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Used by {@link ByteBuddyOnTimerInvokerFactory} to Dynamically generate
+ * {@link OnTimerInvoker} instances for invoking a particular
+ * {@link DoFn.TimerId} on a particular {@link DoFn}.
+ */
+
+@AutoValue
+abstract class OnTimerMethodSpecifier {
+    public abstract Class<? extends DoFn<?, ?>> fnClass();
+    public abstract String timerId();
+    public static OnTimerMethodSpecifier
+    forClassAndTimerId(Class<? extends DoFn<?, ?>> fnClass, String timerId){
+        return  new AutoValue_OnTimerMethodSpecifier(fnClass, timerId);
+    }
+}