You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/17 23:05:27 UTC
[3/4] incubator-beam git commit: Rewrites DoFnReflector to go via
DoFnSignature
Rewrites DoFnReflector to go via DoFnSignature
DoFnSignature encapsulates type information about a DoFn,
in particular which arguments/features its methods
actually use.
Before this commit, DoFnReflector would parse/verify/generate
code in one go; after this commit, these stages are separated:
DoFnSignature encapsulates all information needed to generate
the code.
Additionally, removes the unnecessary genericity in the
implementation of DoFnReflector's code generation for the
very different methods processElement and start/finishBundle.
The code is simpler if decomposed into utility functions,
rather than attempting a uniform representation for different
methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fbf77f90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fbf77f90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fbf77f90
Branch: refs/heads/master
Commit: fbf77f90e0391304a580178f99441256526c4b0e
Parents: 4609773
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 9 17:16:00 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 17 15:43:46 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/DoFn.java | 17 +-
.../beam/sdk/transforms/DoFnAdapters.java | 281 +++++
.../beam/sdk/transforms/DoFnReflector.java | 1150 ------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 6 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 61 +
.../sdk/transforms/reflect/DoFnInvokers.java | 506 ++++++++
.../sdk/transforms/reflect/DoFnSignature.java | 113 ++
.../sdk/transforms/reflect/DoFnSignatures.java | 321 +++++
.../sdk/transforms/reflect/package-info.java | 23 +
.../beam/sdk/transforms/DoFnReflectorTest.java | 822 -------------
.../apache/beam/sdk/transforms/FlattenTest.java | 4 +-
.../dofnreflector/DoFnReflectorTestHelper.java | 116 --
.../transforms/reflect/DoFnInvokersTest.java | 498 ++++++++
.../reflect/DoFnInvokersTestHelper.java | 116 ++
.../transforms/reflect/DoFnSignaturesTest.java | 371 ++++++
.../transforms/DoFnInvokersBenchmark.java | 224 ++++
.../transforms/DoFnReflectorBenchmark.java | 232 ----
18 files changed, 2529 insertions(+), 2334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 80b67af..2348783 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollectionView;
@@ -247,7 +248,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/////////////////////////////////////////////////////////////////////////////
- Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
+ protected Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
/**
* Protects aggregators from being created after initialization.
@@ -283,7 +284,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/**
* Interface for runner implementors to provide implementations of extra context information.
*
- * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
+ * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking an
* annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
* has indicated it needs the given extra context.
*
@@ -301,23 +302,23 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
BoundedWindow window();
/**
- * A placeholder for testing purposes. The return type itself is package-private and not
- * implemented.
+ * A placeholder for testing purposes.
*/
InputProvider<InputT> inputProvider();
/**
- * A placeholder for testing purposes. The return type itself is package-private and not
- * implemented.
+ * A placeholder for testing purposes.
*/
OutputReceiver<OutputT> outputReceiver();
}
- static interface OutputReceiver<T> {
+ /** A placeholder for testing handling of output types during {@link DoFn} reflection. */
+ public interface OutputReceiver<T> {
void output(T output);
}
- static interface InputProvider<T> {
+ /** A placeholder for testing handling of input types during {@link DoFn} reflection. */
+ public interface InputProvider<T> {
T get();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
new file mode 100644
index 0000000..71a148f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+
+/**
+ * Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}.
+ *
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using
+ * {@link DoFnInvoker}) rather than via {@link OldDoFn}.
+ */
+@Deprecated
+public class DoFnAdapters {
+ /** Should not be instantiated. */
+ private DoFnAdapters() {}
+
+ /**
+ * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the
+ * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
+ */
+ public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
+ if (fn instanceof SimpleDoFnAdapter) {
+ return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
+ } else {
+ return fn.getClass();
+ }
+ }
+
+ /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+ public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
+ DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+ if (signature.processElement().usesSingleWindow()) {
+ return new WindowDoFnAdapter<>(fn);
+ } else {
+ return new SimpleDoFnAdapter<>(fn);
+ }
+ }
+
+ /**
+ * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
+ * OldDoFn}.
+ */
+ private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+ private final DoFn<InputT, OutputT> fn;
+ private transient DoFnInvoker<InputT, OutputT> invoker;
+
+ SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
+ super(fn.aggregators);
+ this.fn = fn;
+ this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ }
+
+ @Override
+ public void setup() throws Exception {
+ this.invoker.invokeSetup();
+ }
+
+ @Override
+ public void startBundle(Context c) throws Exception {
+ this.fn.prepareForProcessing();
+ invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
+ }
+
+ @Override
+ public void finishBundle(Context c) throws Exception {
+ invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ this.invoker.invokeTeardown();
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
+ invoker.invokeProcessElement(adapter, adapter);
+ }
+
+ @Override
+ protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return fn.getInputTypeDescriptor();
+ }
+
+ @Override
+ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return fn.getOutputTypeDescriptor();
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return fn.getAllowedTimestampSkew();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.include(fn);
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ }
+ }
+
+ /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
+ private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
+ implements OldDoFn.RequiresWindowAccess {
+
+ WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
+ super(fn);
+ }
+ }
+
+ /**
+ * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ExtraContextFactory} inside a {@link
+ * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
+ * unavailable.
+ */
+ private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
+ implements DoFn.ExtraContextFactory<InputT, OutputT> {
+
+ private OldDoFn<InputT, OutputT>.Context context;
+
+ private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+ fn.super();
+ this.context = context;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ context.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ context.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ public BoundedWindow window() {
+ // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
+ // should be unreachable.
+ throw new UnsupportedOperationException("Can only get the window in ProcessElements");
+ }
+
+ @Override
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("inputProvider() exists only for testing");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+ }
+ }
+
+ /**
+ * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ExtraContextFactory} inside a {@link
+ * DoFn.ProcessElement} method.
+ */
+ private static class ProcessContextAdapter<InputT, OutputT>
+ extends DoFn<InputT, OutputT>.ProcessContext
+ implements DoFn.ExtraContextFactory<InputT, OutputT> {
+
+ private OldDoFn<InputT, OutputT>.ProcessContext context;
+
+ private ProcessContextAdapter(
+ DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
+ fn.super();
+ this.context = context;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return context.sideInput(view);
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ context.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ context.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ public InputT element() {
+ return context.element();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return context.timestamp();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return context.pane();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return context.window();
+ }
+
+ @Override
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("inputProvider() exists only for testing");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
deleted file mode 100644
index bf04041..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ /dev/null
@@ -1,1150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.StartBundle;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.reflect.TypeParameter;
-import com.google.common.reflect.TypeToken;
-
-import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.NamingStrategy.SuffixingRandom;
-import net.bytebuddy.description.field.FieldDescription;
-import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.description.method.ParameterList;
-import net.bytebuddy.description.modifier.FieldManifestation;
-import net.bytebuddy.description.modifier.Visibility;
-import net.bytebuddy.description.type.TypeDescription;
-import net.bytebuddy.description.type.TypeDescription.Generic;
-import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
-import net.bytebuddy.dynamic.scaffold.InstrumentedType;
-import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy.Default;
-import net.bytebuddy.implementation.Implementation;
-import net.bytebuddy.implementation.MethodCall.MethodLocator;
-import net.bytebuddy.implementation.StubMethod;
-import net.bytebuddy.implementation.bind.MethodDelegationBinder.MethodInvoker;
-import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder.TerminationHandler;
-import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
-import net.bytebuddy.implementation.bytecode.Duplication;
-import net.bytebuddy.implementation.bytecode.StackManipulation;
-import net.bytebuddy.implementation.bytecode.Throw;
-import net.bytebuddy.implementation.bytecode.assign.Assigner;
-import net.bytebuddy.implementation.bytecode.member.FieldAccess;
-import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
-import net.bytebuddy.implementation.bytecode.member.MethodReturn;
-import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
-import net.bytebuddy.jar.asm.Label;
-import net.bytebuddy.jar.asm.MethodVisitor;
-import net.bytebuddy.jar.asm.Opcodes;
-import net.bytebuddy.matcher.ElementMatchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-
-/**
- * Utility implementing the necessary reflection for working with {@link DoFn}s.
- */
-public abstract class DoFnReflector {
-
- private static final String FN_DELEGATE_FIELD_NAME = "delegate";
-
- private enum Availability {
- /** Indicates parameters only available in {@code @ProcessElement} methods. */
- PROCESS_ELEMENT_ONLY,
- /** Indicates parameters available in all methods. */
- EVERYWHERE
- }
-
- /**
- * Enumeration of the parameters available from the {@link ExtraContextFactory} to use as
- * additional parameters for {@link DoFn} methods.
- * <p>
- * We don't rely on looking for properly annotated methods within {@link ExtraContextFactory}
- * because erasure would make it impossible to completely fill in the type token for context
- * parameters that depend on the input/output type.
- */
- private enum AdditionalParameter {
-
- /** Any {@link BoundedWindow} parameter is populated by the window of the current element. */
- WINDOW_OF_ELEMENT(Availability.PROCESS_ELEMENT_ONLY, BoundedWindow.class, "window") {
- @Override
- public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
- return TypeToken.of(BoundedWindow.class);
- }
- },
-
- INPUT_PROVIDER(Availability.PROCESS_ELEMENT_ONLY, DoFn.InputProvider.class, "inputProvider") {
- @Override
- public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
- return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
- new TypeParameter<InputT>() {}, in);
- }
-
- @Override
- public boolean isHidden() {
- return true;
- }
- },
-
- OUTPUT_RECEIVER(
- Availability.PROCESS_ELEMENT_ONLY, DoFn.OutputReceiver.class, "outputReceiver") {
- @Override
- public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
- return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
- new TypeParameter<OutputT>() {}, out);
- }
-
- @Override
- public boolean isHidden() {
- return true;
- }
- };
-
- /**
- * Create a type token representing the given parameter. May use the type token associated
- * with the input and output types of the {@link DoFn}, depending on the extra
- * context.
- */
- abstract <InputT, OutputT> TypeToken<?> tokenFor(
- TypeToken<InputT> in, TypeToken<OutputT> out);
-
- /**
- * Indicates whether this enum is for testing only, hence should not appear in error messages,
- * etc. Defaults to {@code false}.
- */
- boolean isHidden() {
- return false;
- }
-
- private final Class<?> rawType;
- private final Availability availability;
- private final transient MethodDescription method;
-
- private AdditionalParameter(Availability availability, Class<?> rawType, String method) {
- this.availability = availability;
- this.rawType = rawType;
- try {
- this.method = new MethodDescription.ForLoadedMethod(
- ExtraContextFactory.class.getMethod(method));
- } catch (NoSuchMethodException | SecurityException e) {
- throw new RuntimeException(
- "Unable to access method " + method + " on " + ExtraContextFactory.class, e);
- }
- }
- }
-
- private static final Map<Class<?>, AdditionalParameter> EXTRA_CONTEXTS;
- private static final Map<Class<?>, AdditionalParameter> EXTRA_PROCESS_CONTEXTS;
-
- static {
- ImmutableMap.Builder<Class<?>, AdditionalParameter> everywhereBuilder =
- ImmutableMap.<Class<?>, AdditionalParameter>builder();
- ImmutableMap.Builder<Class<?>, AdditionalParameter> processElementBuilder =
- ImmutableMap.<Class<?>, AdditionalParameter>builder();
-
- for (AdditionalParameter value : AdditionalParameter.values()) {
- switch (value.availability) {
- case EVERYWHERE:
- everywhereBuilder.put(value.rawType, value);
- break;
- case PROCESS_ELEMENT_ONLY:
- processElementBuilder.put(value.rawType, value);
- break;
- }
- }
-
- EXTRA_CONTEXTS = everywhereBuilder.build();
- EXTRA_PROCESS_CONTEXTS = processElementBuilder
- // Process Element contexts include everything available everywhere
- .putAll(EXTRA_CONTEXTS)
- .build();
- }
-
- /**
- * @return true if the reflected {@link DoFn} uses a Single Window.
- */
- public abstract boolean usesSingleWindow();
-
- /** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */
- public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
- DoFn<InputT, OutputT> fn);
-
- private static final Map<Class<?>, DoFnReflector> REFLECTOR_CACHE =
- new LinkedHashMap<Class<?>, DoFnReflector>();
-
- /**
- * @return the {@link DoFnReflector} for the given {@link DoFn}.
- */
- public static DoFnReflector of(
- @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
- DoFnReflector reflector = REFLECTOR_CACHE.get(fn);
- if (reflector != null) {
- return reflector;
- }
-
- reflector = new GenericDoFnReflector(fn);
- REFLECTOR_CACHE.put(fn, reflector);
- return reflector;
- }
-
- /**
- * Create a {@link OldDoFn} that the {@link DoFn}.
- */
- public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFn<InputT, OutputT> fn) {
- if (usesSingleWindow()) {
- return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
- } else {
- return new SimpleDoFnAdapter<InputT, OutputT>(this, fn);
- }
- }
-
- private static String formatType(TypeToken<?> t) {
- return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
- }
-
- private static String format(Method m) {
- return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
- }
-
- private static Collection<String> describeSupportedTypes(
- Map<Class<?>, AdditionalParameter> extraProcessContexts,
- final TypeToken<?> in, final TypeToken<?> out) {
- return FluentIterable
- .from(extraProcessContexts.values())
- .filter(new Predicate<AdditionalParameter>() {
- @Override
- public boolean apply(@Nonnull AdditionalParameter additionalParameter) {
- return !additionalParameter.isHidden();
- }
- })
- .transform(new Function<AdditionalParameter, String>() {
- @Override
- @Nonnull
- public String apply(@Nonnull AdditionalParameter input) {
- return formatType(input.tokenFor(in, out));
- }
- })
- .toSortedSet(String.CASE_INSENSITIVE_ORDER);
- }
-
- @VisibleForTesting
- static <InputT, OutputT> List<AdditionalParameter> verifyProcessMethodArguments(Method m) {
- return verifyMethodArguments(m,
- EXTRA_PROCESS_CONTEXTS,
- new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {},
- new TypeParameter<InputT>() {},
- new TypeParameter<OutputT>() {});
- }
-
- @VisibleForTesting
- static <InputT, OutputT> List<AdditionalParameter> verifyBundleMethodArguments(Method m) {
- if (m == null) {
- return null;
- }
- return verifyMethodArguments(m,
- EXTRA_CONTEXTS,
- new TypeToken<DoFn<InputT, OutputT>.Context>() {},
- new TypeParameter<InputT>() {},
- new TypeParameter<OutputT>() {});
- }
-
- @VisibleForTesting
- static void verifyLifecycleMethodArguments(Method m) {
- if (m == null) {
- return;
- }
- checkState(void.class.equals(m.getReturnType()), "%s must have void return type", format(m));
- checkState(m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
- }
-
- /**
- * Verify the method arguments for a given {@link DoFn} method.
- *
- * <p>The requirements for a method to be valid, are:
- * <ol>
- * <li>The method has at least one argument.
- * <li>The first argument is of type firstContextArg.
- * <li>The remaining arguments have raw types that appear in {@code contexts}
- * <li>Any generics on the extra context arguments match what is expected. Currently, this
- * is exercised only by placeholders. For example, {@code InputReceiver<InputT> must either match
- * the {@code InputT} {@code OldDoFn<InputT, OutputT>.ProcessContext} or use a wildcard, etc.
- * </ol>
- *
- * @param m the method to verify
- * @param contexts mapping from raw classes to the {@link AdditionalParameter} used
- * to create new instances.
- * @param firstContextArg the expected type of the first context argument
- * @param iParam TypeParameter representing the input type
- * @param oParam TypeParameter representing the output type
- */
- @VisibleForTesting
- static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments(
- Method m,
- Map<Class<?>, AdditionalParameter> contexts,
- TypeToken<?> firstContextArg,
- TypeParameter<InputT> iParam,
- TypeParameter<OutputT> oParam) {
-
- if (!void.class.equals(m.getReturnType())) {
- throw new IllegalStateException(String.format(
- "%s must have a void return type", format(m)));
- }
- if (m.isVarArgs()) {
- throw new IllegalStateException(String.format(
- "%s must not have var args", format(m)));
- }
-
- // The first parameter must be present, and must be the specified type
- Type[] params = m.getGenericParameterTypes();
- TypeToken<?> contextToken = null;
- if (params.length > 0) {
- contextToken = TypeToken.of(params[0]);
- }
- if (contextToken == null
- || !contextToken.getRawType().equals(firstContextArg.getRawType())) {
- throw new IllegalStateException(String.format(
- "%s must take a %s as its first argument",
- format(m), firstContextArg.getRawType().getSimpleName()));
- }
- AdditionalParameter[] contextInfos = new AdditionalParameter[params.length - 1];
-
- // Fill in the generics in the allExtraContextArgs interface from the types in the
- // Context or ProcessContext OldDoFn.
- ParameterizedType pt = (ParameterizedType) contextToken.getType();
- // We actually want the owner, since ProcessContext and Context are owned by DoFn.
- pt = (ParameterizedType) pt.getOwnerType();
- @SuppressWarnings("unchecked")
- TypeToken<InputT> iActual = (TypeToken<InputT>) TypeToken.of(pt.getActualTypeArguments()[0]);
- @SuppressWarnings("unchecked")
- TypeToken<OutputT> oActual = (TypeToken<OutputT>) TypeToken.of(pt.getActualTypeArguments()[1]);
-
- // All of the remaining parameters must be a super-interface of allExtraContextArgs
- // that is not listed in the EXCLUDED_INTERFACES set.
- for (int i = 1; i < params.length; i++) {
- TypeToken<?> param = TypeToken.of(params[i]);
-
- AdditionalParameter info = contexts.get(param.getRawType());
- if (info == null) {
- throw new IllegalStateException(String.format(
- "%s is not a valid context parameter for method %s. Should be one of %s",
- formatType(param), format(m),
- describeSupportedTypes(contexts, iActual, oActual)));
- }
-
- // If we get here, the class matches, but maybe the generics don't:
- TypeToken<?> expected = info.tokenFor(iActual, oActual);
- if (!expected.isSubtypeOf(param)) {
- throw new IllegalStateException(String.format(
- "Incompatible generics in context parameter %s for method %s. Should be %s",
- formatType(param), format(m), formatType(info.tokenFor(iActual, oActual))));
- }
-
- // Register the (now validated) context info
- contextInfos[i - 1] = info;
- }
- return ImmutableList.copyOf(contextInfos);
- }
-
- /** Interface for invoking the {@code OldDoFn} processing methods. */
- public interface DoFnInvoker<InputT, OutputT> {
- /** Invoke {@link OldDoFn#setup} on the bound {@code OldDoFn}. */
- void invokeSetup();
- /** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */
- void invokeStartBundle(
- DoFn<InputT, OutputT>.Context c,
- ExtraContextFactory<InputT, OutputT> extra);
- /** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */
- void invokeFinishBundle(
- DoFn<InputT, OutputT>.Context c,
- ExtraContextFactory<InputT, OutputT> extra);
-
- /** Invoke {@link OldDoFn#teardown()} on the bound {@code DoFn}. */
- void invokeTeardown();
-
- /** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */
- public void invokeProcessElement(
- DoFn<InputT, OutputT>.ProcessContext c,
- ExtraContextFactory<InputT, OutputT> extra);
- }
-
- /**
- * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFn}.
- */
- private static class GenericDoFnReflector extends DoFnReflector {
-
- private final Method setup;
- private final Method startBundle;
- private final Method processElement;
- private final Method finishBundle;
- private final Method teardown;
- private final List<AdditionalParameter> processElementArgs;
- private final List<AdditionalParameter> startBundleArgs;
- private final List<AdditionalParameter> finishBundleArgs;
- private final Constructor<?> constructor;
-
- private GenericDoFnReflector(
- @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
- // Locate the annotated methods
- this.processElement = findAnnotatedMethod(ProcessElement.class, fn, true);
- this.setup = findAnnotatedMethod(Setup.class, fn, false);
- this.startBundle = findAnnotatedMethod(StartBundle.class, fn, false);
- this.finishBundle = findAnnotatedMethod(FinishBundle.class, fn, false);
- this.teardown = findAnnotatedMethod(Teardown.class, fn, false);
-
- // Verify that their method arguments satisfy our conditions.
- this.processElementArgs = verifyProcessMethodArguments(processElement);
- this.startBundleArgs = verifyBundleMethodArguments(startBundle);
- this.finishBundleArgs = verifyBundleMethodArguments(finishBundle);
- verifyLifecycleMethodArguments(setup);
- verifyLifecycleMethodArguments(teardown);
-
- this.constructor = createInvokerConstructor(fn);
- }
-
- private static Collection<Method> declaredMethodsWithAnnotation(
- Class<? extends Annotation> anno,
- Class<?> startClass, Class<?> stopClass) {
- Collection<Method> matches = new ArrayList<>();
-
- Class<?> clazz = startClass;
- LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
-
- // First, find all declared methods on the startClass and parents (up to stopClass)
- while (clazz != null && !clazz.equals(stopClass)) {
- for (Method method : clazz.getDeclaredMethods()) {
- if (method.isAnnotationPresent(anno)) {
- matches.add(method);
- }
- }
-
- Collections.addAll(interfaces, clazz.getInterfaces());
-
- clazz = clazz.getSuperclass();
- }
-
- // Now, iterate over all the discovered interfaces
- for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
- if (method.isAnnotationPresent(anno)) {
- matches.add(method);
- }
- }
- return matches;
- }
-
- private static Method findAnnotatedMethod(
- Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
- Collection<Method> matches = declaredMethodsWithAnnotation(
- anno, fnClazz, DoFn.class);
-
- if (matches.size() == 0) {
- if (required) {
- throw new IllegalStateException(String.format(
- "No method annotated with @%s found in %s",
- anno.getSimpleName(), fnClazz.getName()));
- } else {
- return null;
- }
- }
-
- // If we have at least one match, then either it should be the only match
- // or it should be an extension of the other matches (which came from parent
- // classes).
- Method first = matches.iterator().next();
- for (Method other : matches) {
- if (!first.getName().equals(other.getName())
- || !Arrays.equals(first.getParameterTypes(), other.getParameterTypes())) {
- throw new IllegalStateException(String.format(
- "Found multiple methods annotated with @%s. [%s] and [%s]",
- anno.getSimpleName(), format(first), format(other)));
- }
- }
-
- // We need to be able to call it. We require it is public.
- if ((first.getModifiers() & Modifier.PUBLIC) == 0) {
- throw new IllegalStateException(format(first) + " must be public");
- }
-
- // And make sure its not static.
- if ((first.getModifiers() & Modifier.STATIC) != 0) {
- throw new IllegalStateException(format(first) + " must not be static");
- }
-
- return first;
- }
-
- @Override
- public boolean usesSingleWindow() {
- return usesContext(AdditionalParameter.WINDOW_OF_ELEMENT);
- }
-
- private boolean usesContext(AdditionalParameter param) {
- return processElementArgs.contains(param)
- || (startBundleArgs != null && startBundleArgs.contains(param))
- || (finishBundleArgs != null && finishBundleArgs.contains(param));
- }
-
- /**
- * Use ByteBuddy to generate the code for a {@link DoFnInvoker} that invokes the given
- * {@link DoFn}.
- * @param clazz
- * @return
- */
- private Constructor<? extends DoFnInvoker<?, ?>> createInvokerConstructor(
- @SuppressWarnings("rawtypes") Class<? extends DoFn> clazz) {
-
- final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(clazz);
-
- DynamicType.Builder<?> builder = new ByteBuddy()
- // Create subclasses inside the target class, to have access to
- // private and package-private bits
- .with(new SuffixingRandom("auxiliary") {
- @Override
- public String subclass(Generic superClass) {
- return super.name(clazzDescription);
- }
- })
- // Create a subclass of DoFnInvoker
- .subclass(DoFnInvoker.class, Default.NO_CONSTRUCTORS)
- .defineField(FN_DELEGATE_FIELD_NAME, clazz, Visibility.PRIVATE, FieldManifestation.FINAL)
- // Define a constructor to populate fields appropriately.
- .defineConstructor(Visibility.PUBLIC)
- .withParameter(clazz)
- .intercept(new InvokerConstructor())
- // Implement the three methods by calling into the appropriate functions on the fn.
- .method(ElementMatchers.named("invokeProcessElement"))
- .intercept(InvokerDelegation.create(
- processElement, BeforeDelegation.NOOP, processElementArgs))
- .method(ElementMatchers.named("invokeStartBundle"))
- .intercept(InvokerDelegation.create(
- startBundle, BeforeDelegation.INVOKE_PREPARE_FOR_PROCESSING, startBundleArgs))
- .method(ElementMatchers.named("invokeFinishBundle"))
- .intercept(InvokerDelegation.create(finishBundle,
- BeforeDelegation.NOOP,
- finishBundleArgs))
- .method(ElementMatchers.named("invokeSetup"))
- .intercept(InvokerDelegation.create(setup,
- BeforeDelegation.NOOP,
- Collections.<AdditionalParameter>emptyList()))
- .method(ElementMatchers.named("invokeTeardown"))
- .intercept(InvokerDelegation.create(teardown,
- BeforeDelegation.NOOP,
- Collections.<AdditionalParameter>emptyList()));
-
- @SuppressWarnings("unchecked")
- Class<? extends DoFnInvoker<?, ?>> dynamicClass = (Class<? extends DoFnInvoker<?, ?>>) builder
- .make()
- .load(getClass().getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
- .getLoaded();
- try {
- return dynamicClass.getConstructor(clazz);
- } catch (IllegalArgumentException
- | NoSuchMethodException
- | SecurityException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
- DoFn<InputT, OutputT> fn) {
- try {
- @SuppressWarnings("unchecked")
- DoFnInvoker<InputT, OutputT> invoker =
- (DoFnInvoker<InputT, OutputT>) constructor.newInstance(fn);
- return invoker;
- } catch (InstantiationException
- | IllegalAccessException
- | IllegalArgumentException
- | InvocationTargetException
- | SecurityException e) {
- throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e);
- }
- }
- }
-
- private static class ContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.Context
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.Context context;
-
- private ContextAdapter(
- DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- public BoundedWindow window() {
- // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
- // should be unreachable.
- throw new UnsupportedOperationException("Can only get the window in ProcessElements");
- }
-
- @Override
- public DoFn.InputProvider<InputT> inputProvider() {
- throw new UnsupportedOperationException("inputProvider() exists only for testing");
- }
-
- @Override
- public DoFn.OutputReceiver<OutputT> outputReceiver() {
- throw new UnsupportedOperationException("outputReceiver() exists only for testing");
- }
- }
-
- private static class ProcessContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.ProcessContext context;
-
- private ProcessContextAdapter(
- DoFn<InputT, OutputT> fn,
- OldDoFn<InputT, OutputT>.ProcessContext context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return context.sideInput(view);
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- public InputT element() {
- return context.element();
- }
-
- @Override
- public Instant timestamp() {
- return context.timestamp();
- }
-
- @Override
- public PaneInfo pane() {
- return context.pane();
- }
-
- @Override
- public BoundedWindow window() {
- return context.window();
- }
-
- @Override
- public DoFn.InputProvider<InputT> inputProvider() {
- throw new UnsupportedOperationException("inputProvider() exists only for testing");
- }
-
- @Override
- public DoFn.OutputReceiver<OutputT> outputReceiver() {
- throw new UnsupportedOperationException("outputReceiver() exists only for testing");
- }
- }
-
- public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
- if (fn instanceof SimpleDoFnAdapter) {
- return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
- } else {
- return fn.getClass();
- }
- }
-
- private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-
- private final DoFn<InputT, OutputT> fn;
- private transient DoFnInvoker<InputT, OutputT> invoker;
-
- private SimpleDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
- super(fn.aggregators);
- this.fn = fn;
- this.invoker = reflector.bindInvoker(fn);
- }
-
- @Override
- public void setup() throws Exception {
- invoker.invokeSetup();
- }
-
- @Override
- public void startBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
- ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
- invoker.invokeStartBundle(adapter, adapter);
- }
-
- @Override
- public void finishBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
- ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
- invoker.invokeFinishBundle(adapter, adapter);
- }
-
- @Override
- public void teardown() {
- invoker.invokeTeardown();
- }
-
- @Override
- public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
- ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
- invoker.invokeProcessElement(adapter, adapter);
- }
-
- @Override
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return fn.getInputTypeDescriptor();
- }
-
- @Override
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return fn.getOutputTypeDescriptor();
- }
-
- @Override
- public Duration getAllowedTimestampSkew() {
- return fn.getAllowedTimestampSkew();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(fn);
- }
-
- private void readObject(java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- invoker = DoFnReflector.of(fn.getClass()).bindInvoker(fn);
- }
- }
-
- private static class WindowDoFnAdapter<InputT, OutputT>
- extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess {
-
- private WindowDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
- super(reflector, fn);
- }
- }
-
- private static enum BeforeDelegation {
- NOOP {
- @Override
- StackManipulation manipulation(
- TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep) {
- Preconditions.checkArgument(!finalStep,
- "Shouldn't use NOOP delegation if there is nothing to do afterwards.");
- return StackManipulation.Trivial.INSTANCE;
- }
- },
- INVOKE_PREPARE_FOR_PROCESSING {
- private final Assigner assigner = Assigner.DEFAULT;
-
- @Override
- StackManipulation manipulation(
- TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep) {
- MethodDescription prepareMethod;
- try {
- prepareMethod = new MethodLocator.ForExplicitMethod(
- new MethodDescription.ForLoadedMethod(
- DoFn.class.getDeclaredMethod("prepareForProcessing")))
- .resolve(instrumentedMethod);
- } catch (NoSuchMethodException | SecurityException e) {
- throw new RuntimeException("Unable to locate prepareForProcessing method", e);
- }
-
- if (finalStep) {
- return new StackManipulation.Compound(
- // Invoke the prepare method
- MethodInvoker.Simple.INSTANCE.invoke(prepareMethod),
- // Return from the invokeStartBundle when we're done.
- TerminationHandler.Returning.INSTANCE.resolve(
- assigner, instrumentedMethod, prepareMethod));
- } else {
- return new StackManipulation.Compound(
- // Duplicate the delegation target so that it remains after we invoke prepare
- Duplication.duplicate(delegateType),
- // Invoke the prepare method
- MethodInvoker.Simple.INSTANCE.invoke(prepareMethod),
- // Drop the return value from prepareForProcessing
- TerminationHandler.Dropping.INSTANCE.resolve(
- assigner, instrumentedMethod, prepareMethod));
- }
- }
- };
-
- /**
- * Stack manipulation to perform prior to the delegate call.
- *
- * <ul>
- * <li>Precondition: Stack has the delegate target on top of the stack
- * <li>Postcondition: If finalStep is true, then we've returned from the method. Otherwise, the
- * stack still has the delegate target on top of the stack.
- * </ul>
- *
- * @param delegateType The type of the delegate target, in case it needs to be duplicated.
- * @param instrumentedMethod The method bing instrumented. Necessary for resolving types and
- * other information.
- * @param finalStep If true, return from the {@code invokeStartBundle} method after invoking
- * {@code prepareForProcessing} on the delegate.
- */
- abstract StackManipulation manipulation(
- TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep);
- }
-
- /**
- * A byte-buddy {@link Implementation} that delegates a call that receives
- * {@link AdditionalParameter} to the given {@link DoFn} method.
- */
- private static final class InvokerDelegation implements Implementation {
- @Nullable
- private final Method target;
- private final BeforeDelegation before;
- private final List<AdditionalParameter> args;
- private final Assigner assigner = Assigner.DEFAULT;
- private FieldDescription field;
-
- /**
- * Create the {@link InvokerDelegation} for the specified method.
- *
- * @param target the method to delegate to
- * @param isStartBundle whether or not this is the {@code startBundle} call
- * @param args the {@link AdditionalParameter} to be passed to the {@code target}
- */
- private InvokerDelegation(
- @Nullable Method target,
- BeforeDelegation before,
- List<AdditionalParameter> args) {
- this.target = target;
- this.before = before;
- this.args = args;
- }
-
- /**
- * Generate the {@link Implementation} of one of the life-cycle methods of a
- * {@link DoFn}.
- */
- private static Implementation create(
- @Nullable final Method target, BeforeDelegation before, List<AdditionalParameter> args) {
- if (target == null && before == BeforeDelegation.NOOP) {
- // There is no target to call and nothing needs to happen before. Just produce a stub.
- return StubMethod.INSTANCE;
- } else {
- // We need to generate a non-empty method implementation.
- return new InvokerDelegation(target, before, args);
- }
- }
-
- @Override
- public InstrumentedType prepare(InstrumentedType instrumentedType) {
- // Remember the field description of the instrumented type.
- field = instrumentedType.getDeclaredFields()
- .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)).getOnly();
-
- // Delegating the method call doesn't require any changes to the instrumented type.
- return instrumentedType;
- }
-
- /**
- * Stack manipulation to push the {@link DoFn} reference stored in the
- * delegate field of the invoker on to the top of the stack.
- *
- * <p>This implementation is derived from the code for
- * {@code MethodCall.invoke(m).onInstanceField(clazz, delegateField)} with two key differences.
- * First, it doesn't add a synthetic field each time, which is critical to avoid duplicate field
- * definitions. Second, it uses the {@link AdditionalParameter} to populate the arguments to the
- * method.
- */
- private StackManipulation pushDelegateField() {
- return new StackManipulation.Compound(
- // Push "this" reference to the stack
- MethodVariableAccess.REFERENCE.loadOffset(0),
- // Access the delegate field of the the invoker
- FieldAccess.forField(field).getter());
- }
-
- private StackManipulation pushArgument(
- AdditionalParameter arg, MethodDescription instrumentedMethod) {
- MethodDescription transform = arg.method;
-
- return new StackManipulation.Compound(
- // Push the ExtraContextFactory which must have been argument 2 of the instrumented method
- MethodVariableAccess.REFERENCE.loadOffset(2),
- // Invoke the appropriate method to produce the context argument
- MethodInvocation.invoke(transform));
- }
-
- private StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
- MethodDescription targetMethod = new MethodLocator.ForExplicitMethod(
- new MethodDescription.ForLoadedMethod(target)).resolve(instrumentedMethod);
- ParameterList<?> params = targetMethod.getParameters();
-
- List<StackManipulation> parameters;
- if (!params.isEmpty()) {
- // Instructions to setup the parameters for the call
- parameters = new ArrayList<>(args.size() + 1);
- // 1. The first argument in the delegate method must be the context. This corresponds to
- // the first argument in the instrumented method, so copy that.
- parameters.add(MethodVariableAccess.of(params.get(0).getType().getSuperClass())
- .loadOffset(1));
- // 2. For each of the extra arguments push the appropriate value.
- for (AdditionalParameter arg : args) {
- parameters.add(pushArgument(arg, instrumentedMethod));
- }
- } else {
- parameters = Collections.emptyList();
- }
-
- return new StackManipulation.Compound(
- // Push the parameters
- new StackManipulation.Compound(parameters),
- // Invoke the target method
- wrapWithUserCodeException(MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
- // Return from the instrumented method
- TerminationHandler.Returning.INSTANCE.resolve(
- assigner, instrumentedMethod, targetMethod));
- }
-
- /**
- * Wrap a given stack manipulation in a try catch block. Any exceptions thrown within the
- * try are wrapped with a {@link UserCodeException}.
- */
- private StackManipulation wrapWithUserCodeException(
- final StackManipulation tryBody) {
- final MethodDescription createUserCodeException;
- try {
- createUserCodeException = new MethodDescription.ForLoadedMethod(
- UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
- } catch (NoSuchMethodException | SecurityException e) {
- throw new RuntimeException("Unable to find UserCodeException.wrap", e);
- }
-
- return new StackManipulation() {
- @Override
- public boolean isValid() {
- return tryBody.isValid();
- }
-
- @Override
- public Size apply(MethodVisitor mv, Context implementationContext) {
- Label tryBlockStart = new Label();
- Label tryBlockEnd = new Label();
- Label catchBlockStart = new Label();
- Label catchBlockEnd = new Label();
-
- String throwableName =
- new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
- mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
-
- // The try block attempts to perform the expected operations, then jumps to success
- mv.visitLabel(tryBlockStart);
- Size trySize = tryBody.apply(mv, implementationContext);
- mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
- mv.visitLabel(tryBlockEnd);
-
- // The handler wraps the exception, and then throws.
- mv.visitLabel(catchBlockStart);
- // Add the exception to the frame
- mv.visitFrame(Opcodes.F_SAME1,
- // No local variables
- 0, new Object[] {},
- // 1 stack element (the throwable)
- 1, new Object[] { throwableName });
-
- Size catchSize = new StackManipulation.Compound(
- MethodInvocation.invoke(createUserCodeException),
- Throw.INSTANCE)
- .apply(mv, implementationContext);
-
- mv.visitLabel(catchBlockEnd);
- // The frame contents after the try/catch block is the same
- // as it was before.
- mv.visitFrame(Opcodes.F_SAME,
- // No local variables
- 0, new Object[] {},
- // No new stack variables
- 0, new Object[] {});
-
- return new Size(
- trySize.getSizeImpact(),
- Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize()));
- }
- };
- }
-
- @Override
- public ByteCodeAppender appender(final Target implementationTarget) {
- return new ByteCodeAppender() {
- @Override
- public Size apply(
- MethodVisitor methodVisitor,
- Context implementationContext,
- MethodDescription instrumentedMethod) {
- StackManipulation.Size size = new StackManipulation.Compound(
- // Put the target on top of the stack
- pushDelegateField(),
- // Do any necessary pre-delegation work
- before.manipulation(field.getType().asErasure(), instrumentedMethod, target == null),
- // Invoke the target method, if there is one. If there wasn't, then isStartBundle was
- // true, and we've already emitted the appropriate return instructions.
- target != null
- ? invokeTargetMethod(instrumentedMethod)
- : StackManipulation.Trivial.INSTANCE)
- .apply(methodVisitor, implementationContext);
- return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
- }
- };
- }
- }
-
- /**
- * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
- * for a constructor that takes a single argument and assigns it to the delegate field.
- * {@link AdditionalParameter} to the given {@link DoFn} method.
- */
- private static final class InvokerConstructor implements Implementation {
- @Override
- public InstrumentedType prepare(InstrumentedType instrumentedType) {
- return instrumentedType;
- }
-
- @Override
- public ByteCodeAppender appender(final Target implementationTarget) {
- return new ByteCodeAppender() {
- @Override
- public Size apply(
- MethodVisitor methodVisitor,
- Context implementationContext,
- MethodDescription instrumentedMethod) {
- StackManipulation.Size size = new StackManipulation.Compound(
- // Load the this reference
- MethodVariableAccess.REFERENCE.loadOffset(0),
- // Invoke the super constructor (default constructor of Object)
- MethodInvocation
- .invoke(new TypeDescription.ForLoadedType(Object.class)
- .getDeclaredMethods()
- .filter(ElementMatchers.isConstructor()
- .and(ElementMatchers.takesArguments(0)))
- .getOnly()),
- // Load the this reference
- MethodVariableAccess.REFERENCE.loadOffset(0),
- // Load the delegate argument
- MethodVariableAccess.REFERENCE.loadOffset(1),
- // Assign the delegate argument to the delegate field
- FieldAccess.forField(implementationTarget.getInstrumentedType()
- .getDeclaredFields()
- .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
- .getOnly()).putter(),
- // Return void.
- MethodReturn.VOID
- ).apply(methodVisitor, implementationContext);
- return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index dd1baab..4cd410a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -92,7 +92,7 @@ public class DoFnTester<InputT, OutputT> {
@SuppressWarnings("unchecked")
public static <InputT, OutputT> DoFnTester<InputT, OutputT>
of(DoFn<InputT, OutputT> fn) {
- return new DoFnTester<InputT, OutputT>(DoFnReflector.of(fn.getClass()).toDoFn(fn));
+ return new DoFnTester<InputT, OutputT>(DoFnAdapters.toOldDoFn(fn));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index aa57531..af500ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -536,7 +536,7 @@ public class ParDo {
private static <InputT, OutputT> OldDoFn<InputT, OutputT>
adapt(DoFn<InputT, OutputT> fn) {
- return DoFnReflector.of(fn.getClass()).toDoFn(fn);
+ return DoFnAdapters.toOldDoFn(fn);
}
/**
@@ -747,7 +747,7 @@ public class ParDo {
@Override
protected String getKindString() {
- Class<?> clazz = DoFnReflector.getDoFnClass(fn);
+ Class<?> clazz = DoFnAdapters.getDoFnClass(fn);
if (clazz.isAnonymousClass()) {
return "AnonymousParDo";
} else {
@@ -968,7 +968,7 @@ public class ParDo {
@Override
protected String getKindString() {
- Class<?> clazz = DoFnReflector.getDoFnClass(fn);
+ Class<?> clazz = DoFnAdapters.getDoFnClass(fn);
if (clazz.isAnonymousClass()) {
return "AnonymousParMultiDo";
} else {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
new file mode 100644
index 0000000..5818a59
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Interface for invoking the {@code DoFn} processing methods.
+ *
+ * Instantiating a {@link DoFnInvoker} associates it with a specific {@link DoFn} instance,
+ * referred to as the bound {@link DoFn}.
+ */
+public interface DoFnInvoker<InputT, OutputT> {
+ /**
+ * Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}.
+ */
+ void invokeSetup();
+
+ /**
+ * Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}.
+ *
+ * @param c The {@link DoFn.Context} to invoke the fn with.
+ */
+ void invokeStartBundle(DoFn<InputT, OutputT>.Context c);
+
+ /**
+ * Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}.
+ *
+ * @param c The {@link DoFn.Context} to invoke the fn with.
+ */
+ void invokeFinishBundle(DoFn<InputT, OutputT>.Context c);
+
+ /**
+ * Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}.
+ */
+ void invokeTeardown();
+
+ /**
+ * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
+ *
+ * @param c The {@link DoFn.ProcessContext} to invoke the fn with.
+ * @param extra Factory for producing extra parameter objects (such as window), if necessary.
+ */
+ void invokeProcessElement(
+ DoFn<InputT, OutputT>.ProcessContext c, DoFn.ExtraContextFactory<InputT, OutputT> extra);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
new file mode 100644
index 0000000..73874d7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.Setup;
+import org.apache.beam.sdk.transforms.DoFn.StartBundle;
+import org.apache.beam.sdk.transforms.DoFn.Teardown;
+import org.apache.beam.sdk.util.UserCodeException;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.NamingStrategy;
+import net.bytebuddy.description.field.FieldDescription;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.modifier.FieldManifestation;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.MethodCall;
+import net.bytebuddy.implementation.bind.MethodDelegationBinder;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.Throw;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.jar.asm.Label;
+import net.bytebuddy.jar.asm.MethodVisitor;
+import net.bytebuddy.jar.asm.Opcodes;
+import net.bytebuddy.matcher.ElementMatchers;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
+public class DoFnInvokers {
+ public static final DoFnInvokers INSTANCE = new DoFnInvokers();
+
+ private static final String FN_DELEGATE_FIELD_NAME = "delegate";
+
+ /**
+ * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
+ * Needed because generating an invoker class is expensive, and to avoid generating an excessive
+ * number of classes consuming PermGen memory.
+ */
+ private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
+ new LinkedHashMap<>();
+
+ private DoFnInvokers() {}
+
+ /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
+ DoFn<InputT, OutputT> fn) {
+ return newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass()), fn);
+ }
+
+ /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
+ DoFnSignature signature, DoFn<InputT, OutputT> fn) {
+ checkArgument(
+ signature.fnClass().equals(fn.getClass()),
+ "Signature is for class %s, but fn is of class %s",
+ signature.fnClass(),
+ fn.getClass());
+ try {
+ @SuppressWarnings("unchecked")
+ DoFnInvoker<InputT, OutputT> invoker =
+ (DoFnInvoker<InputT, OutputT>)
+ getOrGenerateByteBuddyInvokerConstructor(signature).newInstance(fn);
+ return invoker;
+ } catch (InstantiationException
+ | IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | SecurityException e) {
+ throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e);
+ }
+ }
+
+ /**
+ * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class and
+ * caches it.
+ */
+ private synchronized Constructor<?> getOrGenerateByteBuddyInvokerConstructor(
+ DoFnSignature signature) {
+ Class<? extends DoFn> fnClass = signature.fnClass();
+ Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
+ if (constructor == null) {
+ Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
+ try {
+ constructor = invokerClass.getConstructor(fnClass);
+ } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException(e);
+ }
+ byteBuddyInvokerConstructorCache.put(fnClass, constructor);
+ }
+ return constructor;
+ }
+
+ /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
+ private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
+ Class<? extends DoFn> fnClass = signature.fnClass();
+
+ final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);
+
+ DynamicType.Builder<?> builder =
+ new ByteBuddy()
+ // Create subclasses inside the target class, to have access to
+ // private and package-private bits
+ .with(
+ new NamingStrategy.SuffixingRandom("auxiliary") {
+ @Override
+ public String subclass(TypeDescription.Generic superClass) {
+ return super.name(clazzDescription);
+ }
+ })
+ // Create a subclass of DoFnInvoker
+ .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+ .defineField(
+ FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+ .defineConstructor(Visibility.PUBLIC)
+ .withParameter(fnClass)
+ .intercept(new InvokerConstructor())
+ .method(ElementMatchers.named("invokeProcessElement"))
+ .intercept(new ProcessElementDelegation(signature.processElement()))
+ .method(ElementMatchers.named("invokeStartBundle"))
+ .intercept(
+ signature.startBundle() == null
+ ? new NoopMethodImplementation()
+ : new BundleMethodDelegation(signature.startBundle()))
+ .method(ElementMatchers.named("invokeFinishBundle"))
+ .intercept(
+ signature.finishBundle() == null
+ ? new NoopMethodImplementation()
+ : new BundleMethodDelegation(signature.finishBundle()))
+ .method(ElementMatchers.named("invokeSetup"))
+ .intercept(
+ signature.setup() == null
+ ? new NoopMethodImplementation()
+ : new LifecycleMethodDelegation(signature.setup()))
+ .method(ElementMatchers.named("invokeTeardown"))
+ .intercept(
+ signature.teardown() == null
+ ? new NoopMethodImplementation()
+ : new LifecycleMethodDelegation(signature.teardown()));
+
+ DynamicType.Unloaded<?> unloaded = builder.make();
+
+ @SuppressWarnings("unchecked")
+ Class<? extends DoFnInvoker<?, ?>> res =
+ (Class<? extends DoFnInvoker<?, ?>>)
+ unloaded
+ .load(DoFnInvokers.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded();
+ return res;
+ }
+
+ /** Implements an invoker method by doing nothing and immediately returning void. */
+ private static class NoopMethodImplementation implements Implementation {
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation manipulation = MethodReturn.VOID;
+ StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+ }
+
+ /**
+ * Base class for implementing an invoker method by delegating to a method of the target {@link
+ * DoFn}.
+ */
+ private abstract static class MethodDelegation implements Implementation {
+ FieldDescription delegateField;
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ // Remember the field description of the instrumented type.
+ delegateField =
+ instrumentedType
+ .getDeclaredFields()
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly();
+
+ // Delegating the method call doesn't require any changes to the instrumented type.
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation manipulation =
+ new StackManipulation.Compound(
+ // Push "this" reference to the stack
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Access the delegate field of the the invoker
+ FieldAccess.forField(delegateField).getter(),
+ invokeTargetMethod(instrumentedMethod));
+ StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+
+ /**
+ * Generates code to invoke the target method. When this is called the delegate field will be on
+ * top of the stack. This should add any necessary arguments to the stack and then perform the
+ * method invocation.
+ */
+ protected abstract StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod);
+ }
+
+ /**
+ * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
+ * {@link DoFn.ProcessElement} method.
+ */
+ private static final class ProcessElementDelegation extends MethodDelegation {
+ private static final Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription>
+ EXTRA_CONTEXT_FACTORY_METHODS;
+
+ static {
+ try {
+ Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription> methods =
+ new EnumMap<>(DoFnSignature.ProcessElementMethod.Parameter.class);
+ methods.put(
+ DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW,
+ new MethodDescription.ForLoadedMethod(
+ DoFn.ExtraContextFactory.class.getMethod("window")));
+ methods.put(
+ DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER,
+ new MethodDescription.ForLoadedMethod(
+ DoFn.ExtraContextFactory.class.getMethod("inputProvider")));
+ methods.put(
+ DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER,
+ new MethodDescription.ForLoadedMethod(
+ DoFn.ExtraContextFactory.class.getMethod("outputReceiver")));
+ EXTRA_CONTEXT_FACTORY_METHODS = Collections.unmodifiableMap(methods);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to locate an ExtraContextFactory method that was expected to exist", e);
+ }
+ }
+
+ private final DoFnSignature.ProcessElementMethod signature;
+
+ /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
+ private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) {
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+ MethodDescription targetMethod =
+ new MethodCall.MethodLocator.ForExplicitMethod(
+ new MethodDescription.ForLoadedMethod(signature.targetMethod()))
+ .resolve(instrumentedMethod);
+
+ // Parameters of the wrapper invoker method:
+ // DoFn.ProcessContext, ExtraContextFactory.
+ // Parameters of the wrapped DoFn method:
+ // DoFn.ProcessContext, [BoundedWindow, InputProvider, OutputReceiver] in any order
+ ArrayList<StackManipulation> parameters = new ArrayList<>();
+ // Push the ProcessContext argument.
+ parameters.add(MethodVariableAccess.REFERENCE.loadOffset(1));
+ // Push the extra arguments in their actual order.
+ StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(2);
+ for (DoFnSignature.ProcessElementMethod.Parameter param : signature.extraParameters()) {
+ parameters.add(
+ new StackManipulation.Compound(
+ pushExtraContextFactory,
+ MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param))));
+ }
+
+ return new StackManipulation.Compound(
+ // Push the parameters
+ new StackManipulation.Compound(parameters),
+ // Invoke the target method
+ wrapWithUserCodeException(
+ MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+ // Return from the instrumented method
+ MethodReturn.VOID);
+ }
+ }
+
+ /**
+ * Implements {@link DoFnInvoker#invokeStartBundle} or {@link DoFnInvoker#invokeFinishBundle} by
+ * delegating respectively to the {@link StartBundle} and {@link FinishBundle} methods.
+ */
+ private static final class BundleMethodDelegation extends MethodDelegation {
+ private final DoFnSignature.BundleMethod signature;
+
+ private BundleMethodDelegation(@Nullable DoFnSignature.BundleMethod signature) {
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+ MethodDescription targetMethod =
+ new MethodCall.MethodLocator.ForExplicitMethod(
+ new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
+ .resolve(instrumentedMethod);
+ return new StackManipulation.Compound(
+ // Push the parameters
+ MethodVariableAccess.REFERENCE.loadOffset(1),
+ // Invoke the target method
+ wrapWithUserCodeException(
+ MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+ MethodReturn.VOID);
+ }
+ }
+
+ /**
+ * Implements {@link DoFnInvoker#invokeSetup} or {@link DoFnInvoker#invokeTeardown} by delegating
+ * respectively to the {@link Setup} and {@link Teardown} methods.
+ */
+ private static final class LifecycleMethodDelegation extends MethodDelegation {
+ private final DoFnSignature.LifecycleMethod signature;
+
+ private LifecycleMethodDelegation(@Nullable DoFnSignature.LifecycleMethod signature) {
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+ MethodDescription targetMethod =
+ new MethodCall.MethodLocator.ForExplicitMethod(
+ new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
+ .resolve(instrumentedMethod);
+ return new StackManipulation.Compound(
+ wrapWithUserCodeException(
+ MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+ MethodReturn.VOID);
+ }
+ }
+
+ /**
+ * Wraps a given stack manipulation in a try catch block. Any exceptions thrown within the try are
+ * wrapped with a {@link UserCodeException}.
+ */
+ private static StackManipulation wrapWithUserCodeException(final StackManipulation tryBody) {
+ final MethodDescription createUserCodeException;
+ try {
+ createUserCodeException =
+ new MethodDescription.ForLoadedMethod(
+ UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException("Unable to find UserCodeException.wrap", e);
+ }
+
+ return new StackManipulation() {
+ @Override
+ public boolean isValid() {
+ return tryBody.isValid();
+ }
+
+ @Override
+ public Size apply(MethodVisitor mv, Implementation.Context implementationContext) {
+ Label tryBlockStart = new Label();
+ Label tryBlockEnd = new Label();
+ Label catchBlockStart = new Label();
+ Label catchBlockEnd = new Label();
+
+ String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
+ mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
+
+ // The try block attempts to perform the expected operations, then jumps to success
+ mv.visitLabel(tryBlockStart);
+ Size trySize = tryBody.apply(mv, implementationContext);
+ mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
+ mv.visitLabel(tryBlockEnd);
+
+ // The handler wraps the exception, and then throws.
+ mv.visitLabel(catchBlockStart);
+ // Add the exception to the frame
+ mv.visitFrame(
+ Opcodes.F_SAME1,
+ // No local variables
+ 0,
+ new Object[] {},
+ // 1 stack element (the throwable)
+ 1,
+ new Object[] {throwableName});
+
+ Size catchSize =
+ new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
+ .apply(mv, implementationContext);
+
+ mv.visitLabel(catchBlockEnd);
+ // The frame contents after the try/catch block is the same
+ // as it was before.
+ mv.visitFrame(
+ Opcodes.F_SAME,
+ // No local variables
+ 0,
+ new Object[] {},
+ // No new stack variables
+ 0,
+ new Object[] {});
+
+ return new Size(
+ trySize.getSizeImpact(),
+ Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize()));
+ }
+ };
+ }
+
+ /**
+ * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
+ * for a constructor that takes a single argument and assigns it to the delegate field.
+ */
+ private static final class InvokerConstructor implements Implementation {
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation.Size size =
+ new StackManipulation.Compound(
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Invoke the super constructor (default constructor of Object)
+ MethodInvocation.invoke(
+ new TypeDescription.ForLoadedType(Object.class)
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.isConstructor()
+ .and(ElementMatchers.takesArguments(0)))
+ .getOnly()),
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Load the delegate argument
+ MethodVariableAccess.REFERENCE.loadOffset(1),
+ // Assign the delegate argument to the delegate field
+ FieldAccess.forField(
+ implementationTarget
+ .getInstrumentedType()
+ .getDeclaredFields()
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly())
+ .putter(),
+ // Return void.
+ MethodReturn.VOID)
+ .apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+ }
+}