You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/17 23:05:25 UTC
[1/4] incubator-beam git commit: Rewrites DoFnReflector to go via
DoFnSignature
Repository: incubator-beam
Updated Branches:
refs/heads/master 46097736b -> 89367cfb1
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
new file mode 100644
index 0000000..1a26df2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+import com.google.common.reflect.TypeToken;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/** Tests for {@link DoFnSignatures}. */
+@RunWith(JUnit4.class)
+public class DoFnSignaturesTest {
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static class FakeDoFn extends DoFn<Integer, String> {}
+
+ @SuppressWarnings({"unused"})
+ private void missingProcessContext() {}
+
+ @Test
+ public void testMissingProcessContext() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ getClass().getName()
+ + "#missingProcessContext() must take a ProcessContext<> as its first argument");
+
+ DoFnSignatures.analyzeProcessElementMethod(
+ TypeToken.of(FakeDoFn.class),
+ getClass().getDeclaredMethod("missingProcessContext"),
+ TypeToken.of(Integer.class),
+ TypeToken.of(String.class));
+ }
+
+ @SuppressWarnings({"unused"})
+ private void badProcessContext(String s) {}
+
+ @Test
+ public void testBadProcessContextType() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ getClass().getName()
+ + "#badProcessContext(String) must take a ProcessContext<> as its first argument");
+
+ DoFnSignatures.analyzeProcessElementMethod(
+ TypeToken.of(FakeDoFn.class),
+ getClass().getDeclaredMethod("badProcessContext", String.class),
+ TypeToken.of(Integer.class),
+ TypeToken.of(String.class));
+ }
+
+ @SuppressWarnings({"unused"})
+ private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
+
+ @Test
+ public void testBadExtraContext() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ getClass().getName()
+ + "#badExtraContext(Context, int) must have a single argument of type Context");
+
+ DoFnSignatures.analyzeBundleMethod(
+ TypeToken.of(FakeDoFn.class),
+ getClass().getDeclaredMethod("badExtraContext", DoFn.Context.class, int.class),
+ TypeToken.of(Integer.class),
+ TypeToken.of(String.class));
+ }
+
+ @SuppressWarnings({"unused"})
+ private void badExtraProcessContext(DoFn<Integer, String>.ProcessContext c, Integer n) {}
+
+ @Test
+ public void testBadExtraProcessContextType() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Integer is not a valid context parameter for method "
+ + getClass().getName()
+ + "#badExtraProcessContext(ProcessContext, Integer)"
+ + ". Should be one of [BoundedWindow]");
+
+ DoFnSignatures.analyzeProcessElementMethod(
+ TypeToken.of(FakeDoFn.class),
+ getClass()
+ .getDeclaredMethod("badExtraProcessContext", DoFn.ProcessContext.class, Integer.class),
+ TypeToken.of(Integer.class),
+ TypeToken.of(String.class));
+ }
+
+ @SuppressWarnings("unused")
+ private int badReturnType() {
+ return 0;
+ }
+
+ @Test
+ public void testBadReturnType() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void return type");
+
+ DoFnSignatures.analyzeProcessElementMethod(
+ TypeToken.of(FakeDoFn.class),
+ getClass().getDeclaredMethod("badReturnType"),
+ TypeToken.of(Integer.class),
+ TypeToken.of(String.class));
+ }
+
+ @SuppressWarnings("unused")
+ private void goodConcreteTypes(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<String> output) {}
+
+ @Test
+ public void testGoodConcreteTypes() throws Exception {
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "goodConcreteTypes",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
+ DoFnSignatures.analyzeProcessElementMethod(
+ TypeToken.of(FakeDoFn.class),
+ method,
+ TypeToken.of(Integer.class),
+ TypeToken.of(String.class));
+ }
+
+ private static class GoodTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> {
+ @ProcessElement
+ @SuppressWarnings("unused")
+ public void goodTypeVariables(
+ DoFn<InputT, OutputT>.ProcessContext c,
+ DoFn.InputProvider<InputT> input,
+ DoFn.OutputReceiver<OutputT> output) {}
+ }
+
+ @Test
+ public void testGoodTypeVariables() throws Exception {
+ DoFnSignatures.INSTANCE.getOrParseSignature(GoodTypeVariables.class);
+ }
+
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @ProcessElement
+ @SuppressWarnings("unused")
+ public void processElement(ProcessContext c, InputProvider<T> input, OutputReceiver<T> output) {
+ c.output(c.element());
+ }
+ }
+
+ private static class IdentityListFn<T> extends IdentityFn<List<T>> {}
+
+ @Test
+ public void testIdentityFnApplied() throws Exception {
+ DoFnSignatures.INSTANCE.getOrParseSignature(new IdentityFn<String>() {}.getClass());
+ }
+
+ @SuppressWarnings("unused")
+ private void badGenericTwoArgs(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<Integer> output) {}
+
+ @Test
+ public void testBadGenericsTwoArgs() throws Exception {
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "badGenericTwoArgs",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Wrong type of OutputReceiver parameter "
+ + "for method "
+ + getClass().getName()
+ + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver): "
+ + "OutputReceiver<Integer>, should be "
+ + "OutputReceiver<String>");
+
+ DoFnSignatures.analyzeProcessElementMethod(
+ TypeToken.of(FakeDoFn.class),
+ method,
+ TypeToken.of(Integer.class),
+ TypeToken.of(String.class));
+ }
+
+ @SuppressWarnings("unused")
+ private void badGenericWildCards(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<? super Integer> output) {}
+
+ @Test
+ public void testBadGenericWildCards() throws Exception {
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "badGenericWildCards",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Wrong type of OutputReceiver parameter for method "
+ + getClass().getName()
+ + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver): "
+ + "OutputReceiver<? super Integer>, should be "
+ + "OutputReceiver<String>");
+
+ DoFnSignatures.analyzeProcessElementMethod(
+ TypeToken.of(FakeDoFn.class),
+ method,
+ TypeToken.of(Integer.class),
+ TypeToken.of(String.class));
+ }
+
+ static class BadTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> {
+ @ProcessElement
+ @SuppressWarnings("unused")
+ public void badTypeVariables(
+ DoFn<InputT, OutputT>.ProcessContext c,
+ DoFn.InputProvider<InputT> input,
+ DoFn.OutputReceiver<InputT> output) {}
+ }
+
+ @Test
+ public void testBadTypeVariables() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Wrong type of OutputReceiver parameter for method "
+ + BadTypeVariables.class.getName()
+ + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver): "
+ + "OutputReceiver<InputT>, should be "
+ + "OutputReceiver<OutputT>");
+
+ DoFnSignatures.INSTANCE.getOrParseSignature(BadTypeVariables.class);
+ }
+
+ @Test
+ public void testNoProcessElement() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("No method annotated with @ProcessElement found");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(new DoFn<String, String>() {}.getClass());
+ }
+
+ @Test
+ public void testMultipleProcessElement() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
+ thrown.expectMessage("foo()");
+ thrown.expectMessage("bar()");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void foo() {}
+
+ @ProcessElement
+ public void bar() {}
+ }.getClass());
+ }
+
+ @Test
+ public void testMultipleStartBundleElement() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Found multiple methods annotated with @StartBundle");
+ thrown.expectMessage("bar()");
+ thrown.expectMessage("baz()");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void foo() {}
+
+ @StartBundle
+ public void bar() {}
+
+ @StartBundle
+ public void baz() {}
+ }.getClass());
+ }
+
+ @Test
+ public void testMultipleFinishBundleMethods() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Found multiple methods annotated with @FinishBundle");
+ thrown.expectMessage("bar(Context)");
+ thrown.expectMessage("baz(Context)");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void foo(ProcessContext context) {}
+
+ @FinishBundle
+ public void bar(Context context) {}
+
+ @FinishBundle
+ public void baz(Context context) {}
+ }.getClass());
+ }
+
+ @Test
+ public void testPrivateProcessElement() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("process() must be public");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ private void process() {}
+ }.getClass());
+ }
+
+ @Test
+ public void testPrivateStartBundle() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("startBundle() must be public");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement() {}
+
+ @StartBundle
+ void startBundle() {}
+ }.getClass());
+ }
+
+ @Test
+ public void testPrivateFinishBundle() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("finishBundle() must be public");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement() {}
+
+ @FinishBundle
+ void finishBundle() {}
+ }.getClass());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
new file mode 100644
index 0000000..a574ed8
--- /dev/null
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.microbenchmarks.transforms;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Benchmarks for {@link OldDoFn} and {@link DoFn} invocations, specifically for measuring the
+ * overhead of {@link DoFnInvokers}.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Warmup(iterations = 5)
+public class DoFnInvokersBenchmark {
+
+ private static final String ELEMENT = "some string to use for testing";
+
+ private OldDoFn<String, String> oldDoFn = new UpperCaseOldDoFn();
+ private DoFn<String, String> doFn = new UpperCaseDoFn();
+
+ private StubOldDoFnProcessContext stubOldDoFnContext =
+ new StubOldDoFnProcessContext(oldDoFn, ELEMENT);
+ private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT);
+ private ExtraContextFactory<String, String> extraContextFactory =
+ new DoFn.FakeExtraContextFactory<>();
+
+ private OldDoFn<String, String> adaptedDoFnWithContext;
+
+ private DoFnInvoker<String, String> invoker;
+
+ @Setup
+ public void setUp() {
+ adaptedDoFnWithContext = DoFnAdapters.toOldDoFn(doFn);
+ invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(doFn);
+ }
+
+ @Benchmark
+ public String invokeOldDoFn() throws Exception {
+ oldDoFn.processElement(stubOldDoFnContext);
+ return stubDoFnContext.output;
+ }
+
+ @Benchmark
+ public String invokeDoFnWithContextViaAdaptor() throws Exception {
+ adaptedDoFnWithContext.processElement(stubOldDoFnContext);
+ return stubOldDoFnContext.output;
+ }
+
+ @Benchmark
+ public String invokeDoFnWithContext() throws Exception {
+ invoker.invokeProcessElement(stubDoFnContext, extraContextFactory);
+ return stubDoFnContext.output;
+ }
+
+ private static class UpperCaseOldDoFn extends OldDoFn<String, String> {
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element().toUpperCase());
+ }
+ }
+
+ private static class UpperCaseDoFn extends DoFn<String, String> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element().toUpperCase());
+ }
+ }
+
+ private static class StubOldDoFnProcessContext extends OldDoFn<String, String>.ProcessContext {
+
+ private final String element;
+ private String output;
+
+ public StubOldDoFnProcessContext(OldDoFn<String, String> fn, String element) {
+ fn.super();
+ this.element = element;
+ }
+
+ @Override
+ public String element() {
+ return element;
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return null;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return null;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return null;
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return null;
+ }
+
+ @Override
+ public WindowingInternals<String, String> windowingInternals() {
+ return null;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return null;
+ }
+
+ @Override
+ public void output(String output) {
+ this.output = output;
+ }
+
+ @Override
+ public void outputWithTimestamp(String output, Instant timestamp) {
+ output(output);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {}
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {}
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return null;
+ }
+ }
+
+ private static class StubDoFnProcessContext extends DoFn<String, String>.ProcessContext {
+ private final String element;
+ private String output;
+
+ public StubDoFnProcessContext(DoFn<String, String> fn, String element) {
+ fn.super();
+ this.element = element;
+ }
+
+ @Override
+ public String element() {
+ return element;
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return null;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return null;
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return null;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return null;
+ }
+
+ @Override
+ public void output(String output) {
+ this.output = output;
+ }
+
+ @Override
+ public void outputWithTimestamp(String output, Instant timestamp) {
+ output(output);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {}
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
deleted file mode 100644
index 91ecd16..0000000
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.microbenchmarks.transforms;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFnReflector;
-import org.apache.beam.sdk.transforms.DoFnReflector.DoFnInvoker;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.joda.time.Instant;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Warmup;
-
-/**
- * Benchmarks for {@link OldDoFn} and {@link DoFn} invocations, specifically
- * for measuring the overhead of {@link DoFnReflector}.
- */
-@State(Scope.Benchmark)
-@Fork(1)
-@Warmup(iterations = 5)
-public class DoFnReflectorBenchmark {
-
- private static final String ELEMENT = "some string to use for testing";
-
- private OldDoFn<String, String> oldDoFn = new UpperCaseOldDoFn();
- private DoFn<String, String> doFn = new UpperCaseDoFn();
-
- private StubOldDoFnProcessContext stubOldDoFnContext = new StubOldDoFnProcessContext(oldDoFn,
- ELEMENT);
- private StubDoFnProcessContext stubDoFnContext =
- new StubDoFnProcessContext(doFn, ELEMENT);
- private ExtraContextFactory<String, String> extraContextFactory =
- new DoFn.FakeExtraContextFactory<>();
-
- private DoFnReflector doFnReflector;
- private OldDoFn<String, String> adaptedDoFnWithContext;
-
- private DoFnInvoker<String, String> invoker;
-
- @Setup
- public void setUp() {
- doFnReflector = DoFnReflector.of(doFn.getClass());
- adaptedDoFnWithContext = doFnReflector.toDoFn(doFn);
- invoker = doFnReflector.bindInvoker(doFn);
- }
-
- @Benchmark
- public String invokeOldDoFn() throws Exception {
- oldDoFn.processElement(stubOldDoFnContext);
- return stubDoFnContext.output;
- }
-
- @Benchmark
- public String invokeDoFnWithContextViaAdaptor() throws Exception {
- adaptedDoFnWithContext.processElement(stubOldDoFnContext);
- return stubOldDoFnContext.output;
- }
-
- @Benchmark
- public String invokeDoFnWithContext() throws Exception {
- invoker.invokeProcessElement(stubDoFnContext, extraContextFactory);
- return stubDoFnContext.output;
- }
-
- private static class UpperCaseOldDoFn extends OldDoFn<String, String> {
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element().toUpperCase());
- }
- }
-
- private static class UpperCaseDoFn extends DoFn<String, String> {
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element().toUpperCase());
- }
- }
-
- private static class StubOldDoFnProcessContext extends OldDoFn<String, String>.ProcessContext {
-
- private final String element;
- private String output;
-
- public StubOldDoFnProcessContext(OldDoFn<String, String> fn, String element) {
- fn.super();
- this.element = element;
- }
-
- @Override
- public String element() {
- return element;
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return null;
- }
-
- @Override
- public Instant timestamp() {
- return null;
- }
-
- @Override
- public BoundedWindow window() {
- return null;
- }
-
- @Override
- public PaneInfo pane() {
- return null;
- }
-
- @Override
- public WindowingInternals<String, String> windowingInternals() {
- return null;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return null;
- }
-
- @Override
- public void output(String output) {
- this.output = output;
- }
-
- @Override
- public void outputWithTimestamp(String output, Instant timestamp) {
- output(output);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return null;
- }
- }
-
- private static class StubDoFnProcessContext
- extends DoFn<String, String>.ProcessContext {
- private final String element;
- private String output;
-
- public StubDoFnProcessContext(DoFn<String, String> fn, String element) {
- fn.super();
- this.element = element;
- }
-
- @Override
- public String element() {
- return element;
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return null;
- }
-
- @Override
- public Instant timestamp() {
- return null;
- }
-
- @Override
- public PaneInfo pane() {
- return null;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return null;
- }
-
- @Override
- public void output(String output) {
- this.output = output;
- }
-
- @Override
- public void outputWithTimestamp(String output, Instant timestamp) {
- output(output);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- }
- }
-}
[2/4] incubator-beam git commit: Rewrites DoFnReflector to go via
DoFnSignature
Posted by bc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
new file mode 100644
index 0000000..6730140
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+import com.google.auto.value.AutoValue;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra
+ * context it requires, types of the input and output elements, etc.
+ *
+ * <p>See <a href="https://s.apache.org/a-new-dofn">A new DoFn</a>.
+ */
+@AutoValue
+public abstract class DoFnSignature {
+ public abstract Class<? extends DoFn> fnClass();
+
+ public abstract ProcessElementMethod processElement();
+
+ @Nullable
+ public abstract BundleMethod startBundle();
+
+ @Nullable
+ public abstract BundleMethod finishBundle();
+
+ @Nullable
+ public abstract LifecycleMethod setup();
+
+ @Nullable
+ public abstract LifecycleMethod teardown();
+
+ static DoFnSignature create(
+ Class<? extends DoFn> fnClass,
+ ProcessElementMethod processElement,
+ @Nullable BundleMethod startBundle,
+ @Nullable BundleMethod finishBundle,
+ @Nullable LifecycleMethod setup,
+ @Nullable LifecycleMethod teardown) {
+ return new AutoValue_DoFnSignature(
+ fnClass,
+ processElement,
+ startBundle,
+ finishBundle,
+ setup,
+ teardown);
+ }
+
+ /** Describes a {@link DoFn.ProcessElement} method. */
+ @AutoValue
+ public abstract static class ProcessElementMethod {
+ enum Parameter {
+ BOUNDED_WINDOW,
+ INPUT_PROVIDER,
+ OUTPUT_RECEIVER
+ }
+
+ public abstract Method targetMethod();
+
+ public abstract List<Parameter> extraParameters();
+
+ static ProcessElementMethod create(Method targetMethod, List<Parameter> extraParameters) {
+ return new AutoValue_DoFnSignature_ProcessElementMethod(
+ targetMethod, Collections.unmodifiableList(extraParameters));
+ }
+
+ /** @return true if the reflected {@link DoFn} uses a Single Window. */
+ public boolean usesSingleWindow() {
+ return extraParameters().contains(Parameter.BOUNDED_WINDOW);
+ }
+ }
+
+ /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
+ @AutoValue
+ public abstract static class BundleMethod {
+ public abstract Method targetMethod();
+
+ static BundleMethod create(Method targetMethod) {
+ return new AutoValue_DoFnSignature_BundleMethod(targetMethod);
+ }
+ }
+
+ /** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */
+ @AutoValue
+ public abstract static class LifecycleMethod {
+ public abstract Method targetMethod();
+
+ static LifecycleMethod create(Method targetMethod) {
+ return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
new file mode 100644
index 0000000..80b3b4f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.reflect.TypeParameter;
+import com.google.common.reflect.TypeToken;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getOrParseSignature}.
+ */
+public class DoFnSignatures {
+ public static final DoFnSignatures INSTANCE = new DoFnSignatures();
+
+ private DoFnSignatures() {}
+
+ private final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
+
+ /** @return the {@link DoFnSignature} for the given {@link DoFn}. */
+ public synchronized DoFnSignature getOrParseSignature(
+ @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
+ DoFnSignature signature = signatureCache.get(fn);
+ if (signature == null) {
+ signatureCache.put(fn, signature = parseSignature(fn));
+ }
+ return signature;
+ }
+
+ /** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */
+ private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) {
+ TypeToken<?> inputT = null;
+ TypeToken<?> outputT = null;
+
+ // Extract the input and output type.
+ checkArgument(
+ DoFn.class.isAssignableFrom(fnClass),
+ "%s must be subtype of DoFn",
+ fnClass.getSimpleName());
+ TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass);
+ for (TypeToken<?> supertype : fnToken.getTypes()) {
+ if (!supertype.getRawType().equals(DoFn.class)) {
+ continue;
+ }
+ Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments();
+ inputT = TypeToken.of(args[0]);
+ outputT = TypeToken.of(args[1]);
+ }
+ checkNotNull(inputT, "Unable to determine input type from %s", fnClass);
+
+ Method processElementMethod = findAnnotatedMethod(DoFn.ProcessElement.class, fnClass, true);
+ Method startBundleMethod = findAnnotatedMethod(DoFn.StartBundle.class, fnClass, false);
+ Method finishBundleMethod = findAnnotatedMethod(DoFn.FinishBundle.class, fnClass, false);
+ Method setupMethod = findAnnotatedMethod(DoFn.Setup.class, fnClass, false);
+ Method teardownMethod = findAnnotatedMethod(DoFn.Teardown.class, fnClass, false);
+
+ return DoFnSignature.create(
+ fnClass,
+ analyzeProcessElementMethod(fnToken, processElementMethod, inputT, outputT),
+ (startBundleMethod == null)
+ ? null
+ : analyzeBundleMethod(fnToken, startBundleMethod, inputT, outputT),
+ (finishBundleMethod == null)
+ ? null
+ : analyzeBundleMethod(fnToken, finishBundleMethod, inputT, outputT),
+ (setupMethod == null) ? null : analyzeLifecycleMethod(setupMethod),
+ (teardownMethod == null) ? null : analyzeLifecycleMethod(teardownMethod));
+ }
+
+ /**
+ * Generates a type token for {@code DoFn<InputT, OutputT>.ProcessContext} given {@code InputT}
+ * and {@code OutputT}.
+ */
+ private static <InputT, OutputT>
+ TypeToken<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf(
+ TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
+ return new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}.where(
+ new TypeParameter<InputT>() {}, inputT)
+ .where(new TypeParameter<OutputT>() {}, outputT);
+ }
+
+ /**
+ * Generates a type token for {@code DoFn<InputT, OutputT>.Context} given {@code InputT} and
+ * {@code OutputT}.
+ */
+ private static <InputT, OutputT> TypeToken<DoFn<InputT, OutputT>.Context> doFnContextTypeOf(
+ TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
+ return new TypeToken<DoFn<InputT, OutputT>.Context>() {}.where(
+ new TypeParameter<InputT>() {}, inputT)
+ .where(new TypeParameter<OutputT>() {}, outputT);
+ }
+
+ /** Generates a type token for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */
+ private static <InputT> TypeToken<DoFn.InputProvider<InputT>> inputProviderTypeOf(
+ TypeToken<InputT> inputT) {
+ return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
+ new TypeParameter<InputT>() {}, inputT);
+ }
+
+ /** Generates a type token for {@code DoFn.OutputReceiver<OutputT>} given {@code OutputT}. */
+ private static <OutputT> TypeToken<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf(
+ TypeToken<OutputT> inputT) {
+ return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
+ new TypeParameter<OutputT>() {}, inputT);
+ }
+
+ @VisibleForTesting
+ static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
+ TypeToken<? extends DoFn> fnClass, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
+ checkArgument(
+ void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+ checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+
+ TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
+
+ Type[] params = m.getGenericParameterTypes();
+ TypeToken<?> contextToken = null;
+ if (params.length > 0) {
+ contextToken = fnClass.resolveType(params[0]);
+ }
+ checkArgument(
+ contextToken != null && contextToken.equals(processContextToken),
+ "%s must take a %s as its first argument",
+ format(m),
+ formatType(processContextToken));
+
+ List<DoFnSignature.ProcessElementMethod.Parameter> extraParameters = new ArrayList<>();
+ TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
+ TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
+ for (int i = 1; i < params.length; ++i) {
+ TypeToken<?> param = fnClass.resolveType(params[i]);
+ Class<?> rawType = param.getRawType();
+ if (rawType.equals(BoundedWindow.class)) {
+ checkArgument(
+ !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW),
+ "Multiple BoundedWindow parameters in %s",
+ format(m));
+ extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW);
+ } else if (rawType.equals(DoFn.InputProvider.class)) {
+ checkArgument(
+ !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER),
+ "Multiple InputProvider parameters in %s",
+ format(m));
+ checkArgument(
+ param.equals(expectedInputProviderT),
+ "Wrong type of InputProvider parameter for method %s: %s, should be %s",
+ format(m),
+ formatType(param),
+ formatType(expectedInputProviderT));
+ extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER);
+ } else if (rawType.equals(DoFn.OutputReceiver.class)) {
+ checkArgument(
+ !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER),
+ "Multiple OutputReceiver parameters in %s",
+ format(m));
+ checkArgument(
+ param.equals(expectedOutputReceiverT),
+ "Wrong type of OutputReceiver parameter for method %s: %s, should be %s",
+ format(m),
+ formatType(param),
+ formatType(expectedOutputReceiverT));
+ extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER);
+ } else {
+ List<String> allowedParamTypes =
+ Arrays.asList(formatType(new TypeToken<BoundedWindow>() {}));
+ checkArgument(
+ false,
+ "%s is not a valid context parameter for method %s. Should be one of %s",
+ formatType(param),
+ format(m),
+ allowedParamTypes);
+ }
+ }
+
+ return DoFnSignature.ProcessElementMethod.create(m, extraParameters);
+ }
+
+ @VisibleForTesting
+ static DoFnSignature.BundleMethod analyzeBundleMethod(
+ TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
+ checkArgument(
+ void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+ checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+
+ TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT);
+
+ Type[] params = m.getGenericParameterTypes();
+ checkArgument(
+ params.length == 1,
+ "%s must have a single argument of type %s",
+ format(m),
+ formatType(expectedContextToken));
+ TypeToken<?> contextToken = fnToken.resolveType(params[0]);
+ checkArgument(
+ contextToken.equals(expectedContextToken),
+ "Wrong type of context argument to %s: %s, must be %s",
+ format(m),
+ formatType(contextToken),
+ formatType(expectedContextToken));
+
+ return DoFnSignature.BundleMethod.create(m);
+ }
+
+ private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(Method m) {
+ checkArgument(
+ void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+ checkArgument(
+ m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
+ return DoFnSignature.LifecycleMethod.create(m);
+ }
+
+ private static Collection<Method> declaredMethodsWithAnnotation(
+ Class<? extends Annotation> anno, Class<?> startClass, Class<?> stopClass) {
+ Collection<Method> matches = new ArrayList<>();
+
+ Class<?> clazz = startClass;
+ LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
+
+ // First, find all declared methods on the startClass and parents (up to stopClass)
+ while (clazz != null && !clazz.equals(stopClass)) {
+ for (Method method : clazz.getDeclaredMethods()) {
+ if (method.isAnnotationPresent(anno)) {
+ matches.add(method);
+ }
+ }
+
+ Collections.addAll(interfaces, clazz.getInterfaces());
+
+ clazz = clazz.getSuperclass();
+ }
+
+ // Now, iterate over all the discovered interfaces
+ for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
+ if (method.isAnnotationPresent(anno)) {
+ matches.add(method);
+ }
+ }
+ return matches;
+ }
+
+ private static Method findAnnotatedMethod(
+ Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
+ Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class);
+
+ if (matches.size() == 0) {
+ checkArgument(
+ !required,
+ "No method annotated with @%s found in %s",
+ anno.getSimpleName(),
+ fnClazz.getName());
+ return null;
+ }
+
+ // If we have at least one match, then either it should be the only match
+ // or it should be an extension of the other matches (which came from parent
+ // classes).
+ Method first = matches.iterator().next();
+ for (Method other : matches) {
+ checkArgument(
+ first.getName().equals(other.getName())
+ && Arrays.equals(first.getParameterTypes(), other.getParameterTypes()),
+ "Found multiple methods annotated with @%s. [%s] and [%s]",
+ anno.getSimpleName(),
+ format(first),
+ format(other));
+ }
+
+ // We need to be able to call it. We require it is public.
+ checkArgument(
+ (first.getModifiers() & Modifier.PUBLIC) != 0, "%s must be public", format(first));
+
+ // And make sure its not static.
+ checkArgument(
+ (first.getModifiers() & Modifier.STATIC) == 0, "%s must not be static", format(first));
+
+ return first;
+ }
+
+ private static String format(Method m) {
+ return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
+ }
+
+ private static String formatType(TypeToken<?> t) {
+ return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
new file mode 100644
index 0000000..4df5209
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines reflection-based utilities for analyzing {@link org.apache.beam.sdk.transforms.DoFn}'s
+ * and creating {@link org.apache.beam.sdk.transforms.reflect.DoFnSignature}'s and
+ * {@link org.apache.beam.sdk.transforms.reflect.DoFnInvoker}'s from them.
+ */
+package org.apache.beam.sdk.transforms.reflect;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
deleted file mode 100644
index e05e5e2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ /dev/null
@@ -1,822 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
-import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.UserCodeException;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.lang.reflect.Method;
-
-/**
- * Tests for {@link DoFnReflector}.
- */
-@RunWith(JUnit4.class)
-public class DoFnReflectorTest {
-
- /**
- * A convenience struct holding flags that indicate whether a particular method was invoked.
- */
- public static class Invocations {
- public boolean wasProcessElementInvoked = false;
- public boolean wasStartBundleInvoked = false;
- public boolean wasFinishBundleInvoked = false;
- public boolean wasSetupInvoked = false;
- public boolean wasTeardownInvoked = false;
- private final String name;
-
- public Invocations(String name) {
- this.name = name;
- }
- }
-
- private DoFn<String, String> fn;
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Mock
- private DoFn<String, String>.ProcessContext mockContext;
- @Mock
- private BoundedWindow mockWindow;
- @Mock
- private DoFn.InputProvider<String> mockInputProvider;
- @Mock
- private DoFn.OutputReceiver<String> mockOutputReceiver;
-
- private ExtraContextFactory<String, String> extraContextFactory;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- this.extraContextFactory = new ExtraContextFactory<String, String>() {
- @Override
- public BoundedWindow window() {
- return mockWindow;
- }
-
- @Override
- public DoFn.InputProvider<String> inputProvider() {
- return mockInputProvider;
- }
-
- @Override
- public DoFn.OutputReceiver<String> outputReceiver() {
- return mockOutputReceiver;
- }
- };
- }
-
- private DoFnReflector underTest(DoFn<String, String> fn) {
- this.fn = fn;
- return DoFnReflector.of(fn.getClass());
- }
-
- private void checkInvokeProcessElementWorks(
- DoFnReflector r, Invocations... invocations) throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called processElement on " + invocation.name,
- invocation.wasProcessElementInvoked);
- }
- r.bindInvoker(fn).invokeProcessElement(mockContext, extraContextFactory);
- for (Invocations invocation : invocations) {
- assertTrue("Should have called processElement on " + invocation.name,
- invocation.wasProcessElementInvoked);
- }
- }
-
- private void checkInvokeStartBundleWorks(
- DoFnReflector r, Invocations... invocations) throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called startBundle on " + invocation.name,
- invocation.wasStartBundleInvoked);
- }
- r.bindInvoker(fn).invokeStartBundle(mockContext, extraContextFactory);
- for (Invocations invocation : invocations) {
- assertTrue("Should have called startBundle on " + invocation.name,
- invocation.wasStartBundleInvoked);
- }
- }
-
- private void checkInvokeFinishBundleWorks(
- DoFnReflector r, Invocations... invocations) throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called finishBundle on " + invocation.name,
- invocation.wasFinishBundleInvoked);
- }
- r.bindInvoker(fn).invokeFinishBundle(mockContext, extraContextFactory);
- for (Invocations invocation : invocations) {
- assertTrue("Should have called finishBundle on " + invocation.name,
- invocation.wasFinishBundleInvoked);
- }
- }
-
- private void checkInvokeSetupWorks(DoFnReflector r, Invocations... invocations) throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called setup on " + invocation.name,
- invocation.wasSetupInvoked);
- }
- r.bindInvoker(fn).invokeSetup();
- for (Invocations invocation : invocations) {
- assertTrue("Should have called setup on " + invocation.name,
- invocation.wasSetupInvoked);
- }
- }
-
- private void checkInvokeTeardownWorks(DoFnReflector r, Invocations... invocations)
- throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called teardown on " + invocation.name,
- invocation.wasTeardownInvoked);
- }
- r.bindInvoker(fn).invokeTeardown();
- for (Invocations invocation : invocations) {
- assertTrue("Should have called teardown on " + invocation.name,
- invocation.wasTeardownInvoked);
- }
- }
-
- @Test
- public void testDoFnWithNoExtraContext() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(ProcessContext c)
- throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
- });
-
- assertFalse(reflector.usesSingleWindow());
-
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnInvokersReused() throws Exception {
- // Ensures that we don't create a new Invoker class for every instance of the OldDoFn.
- IdentityParent fn1 = new IdentityParent();
- IdentityParent fn2 = new IdentityParent();
- DoFnReflector reflector1 = underTest(fn1);
- DoFnReflector reflector2 = underTest(fn2);
- assertSame("DoFnReflector instances should be cached and reused for identical types",
- reflector1, reflector2);
- assertSame("Invoker classes should only be generated once for each type",
- reflector1.bindInvoker(fn1).getClass(),
- reflector2.bindInvoker(fn2).getClass());
- }
-
- interface InterfaceWithProcessElement {
- @ProcessElement
- void processElement(DoFn<String, String>.ProcessContext c);
- }
-
- interface LayersOfInterfaces extends InterfaceWithProcessElement {}
-
- private class IdentityUsingInterfaceWithProcessElement
- extends DoFn<String, String>
- implements LayersOfInterfaces {
-
- private Invocations invocations = new Invocations("Named Class");
-
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
- }
-
- @Test
- public void testDoFnWithProcessElementInterface() throws Exception {
- IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
- DoFnReflector reflector = underTest(fn);
- assertFalse(reflector.usesSingleWindow());
- checkInvokeProcessElementWorks(reflector, fn.invocations);
- }
-
- private class IdentityParent extends DoFn<String, String> {
- protected Invocations parentInvocations = new Invocations("IdentityParent");
-
- @ProcessElement
- public void process(ProcessContext c) {
- parentInvocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
- }
-
- private class IdentityChildWithoutOverride extends IdentityParent {
- }
-
- private class IdentityChildWithOverride extends IdentityParent {
- protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
-
- @Override
- public void process(DoFn<String, String>.ProcessContext c) {
- super.process(c);
- childInvocations.wasProcessElementInvoked = true;
- }
- }
-
- @Test
- public void testDoFnWithMethodInSuperclass() throws Exception {
- IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
- DoFnReflector reflector = underTest(fn);
- assertFalse(reflector.usesSingleWindow());
- checkInvokeProcessElementWorks(reflector, fn.parentInvocations);
- }
-
- @Test
- public void testDoFnWithMethodInSubclass() throws Exception {
- IdentityChildWithOverride fn = new IdentityChildWithOverride();
- DoFnReflector reflector = underTest(fn);
- assertFalse(reflector.usesSingleWindow());
- checkInvokeProcessElementWorks(reflector, fn.parentInvocations, fn.childInvocations);
- }
-
- @Test
- public void testDoFnWithWindow() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow w)
- throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(w, mockWindow);
- }
- });
-
- assertTrue(reflector.usesSingleWindow());
-
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnWithOutputReceiver() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(ProcessContext c, DoFn.OutputReceiver<String> o)
- throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(o, mockOutputReceiver);
- }
- });
-
- assertFalse(reflector.usesSingleWindow());
-
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnWithInputProvider() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(ProcessContext c, DoFn.InputProvider<String> i)
- throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(i, mockInputProvider);
- }
- });
-
- assertFalse(reflector.usesSingleWindow());
-
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnWithStartBundle() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
- @StartBundle
- public void startBundle(Context c) {
- invocations.wasStartBundleInvoked = true;
- assertSame(c, mockContext);
- }
-
- @FinishBundle
- public void finishBundle(Context c) {
- invocations.wasFinishBundleInvoked = true;
- assertSame(c, mockContext);
- }
- });
-
- checkInvokeStartBundleWorks(reflector, invocations);
- checkInvokeFinishBundleWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnWithSetupTeardown() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
- @StartBundle
- public void startBundle(Context c) {
- invocations.wasStartBundleInvoked = true;
- assertSame(c, mockContext);
- }
-
- @FinishBundle
- public void finishBundle(Context c) {
- invocations.wasFinishBundleInvoked = true;
- assertSame(c, mockContext);
- }
-
- @Setup
- public void before() {
- invocations.wasSetupInvoked = true;
- }
-
- @Teardown
- public void after() {
- invocations.wasTeardownInvoked = true;
- }
- });
-
- checkInvokeSetupWorks(reflector, invocations);
- checkInvokeTeardownWorks(reflector, invocations);
- }
-
- @Test
- public void testNoProcessElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("No method annotated with @ProcessElement found");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {});
- }
-
- @Test
- public void testMultipleProcessElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
- thrown.expectMessage("foo()");
- thrown.expectMessage("bar()");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void foo() {}
-
- @ProcessElement
- public void bar() {}
- });
- }
-
- @Test
- public void testMultipleStartBundleElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Found multiple methods annotated with @StartBundle");
- thrown.expectMessage("bar()");
- thrown.expectMessage("baz()");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void foo() {}
-
- @StartBundle
- public void bar() {}
-
- @StartBundle
- public void baz() {}
- });
- }
-
- @Test
- public void testMultipleFinishBundleElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Found multiple methods annotated with @FinishBundle");
- thrown.expectMessage("bar()");
- thrown.expectMessage("baz()");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void foo() {}
-
- @FinishBundle
- public void bar() {}
-
- @FinishBundle
- public void baz() {}
- });
- }
-
- private static class PrivateDoFnClass extends DoFn<String, String> {
- final Invocations invocations = new Invocations(getClass().getName());
-
- @ProcessElement
- public void processThis(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- @Test
- public void testLocalPrivateDoFnClass() throws Exception {
- PrivateDoFnClass fn = new PrivateDoFnClass();
- DoFnReflector reflector = underTest(fn);
- checkInvokeProcessElementWorks(reflector, fn.invocations);
- }
-
- @Test
- public void testStaticPackagePrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
- DoFnReflector reflector =
- underTest(DoFnReflectorTestHelper.newStaticPackagePrivateDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testInnerPackagePrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
- DoFnReflector reflector =
- underTest(new DoFnReflectorTestHelper().newInnerPackagePrivateDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testStaticPrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticPrivateDoFn");
- DoFnReflector reflector = underTest(DoFnReflectorTestHelper.newStaticPrivateDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testInnerPrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticInnerDoFn");
- DoFnReflector reflector =
- underTest(new DoFnReflectorTestHelper().newInnerPrivateDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
- Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
- DoFnReflector reflector =
- underTest(new DoFnReflectorTestHelper().newInnerAnonymousDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
- Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
- DoFnReflector reflector =
- underTest(DoFnReflectorTestHelper.newStaticAnonymousDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testPrivateProcessElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("process() must be public");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- private void process() {}
- });
- }
-
- @Test
- public void testPrivateStartBundle() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("startBundle() must be public");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void processElement() {}
-
- @StartBundle
- void startBundle() {}
- });
- }
-
- @Test
- public void testPrivateFinishBundle() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("finishBundle() must be public");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void processElement() {}
-
- @FinishBundle
- void finishBundle() {}
- });
- }
-
- @SuppressWarnings({"unused"})
- private void missingProcessContext() {}
-
- @Test
- public void testMissingProcessContext() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(getClass().getName()
- + "#missingProcessContext() must take a ProcessContext as its first argument");
-
- DoFnReflector.verifyProcessMethodArguments(
- getClass().getDeclaredMethod("missingProcessContext"));
- }
-
- @SuppressWarnings({"unused"})
- private void badProcessContext(String s) {}
-
- @Test
- public void testBadProcessContextType() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(getClass().getName()
- + "#badProcessContext(String) must take a ProcessContext as its first argument");
-
- DoFnReflector.verifyProcessMethodArguments(
- getClass().getDeclaredMethod("badProcessContext", String.class));
- }
-
- @SuppressWarnings({"unused"})
- private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
-
- @Test
- public void testBadExtraContext() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "int is not a valid context parameter for method "
- + getClass().getName() + "#badExtraContext(Context, int). Should be one of [");
-
- DoFnReflector.verifyBundleMethodArguments(
- getClass().getDeclaredMethod("badExtraContext", Context.class, int.class));
- }
-
- @SuppressWarnings({"unused"})
- private void badExtraProcessContext(
- DoFn<Integer, String>.ProcessContext c, Integer n) {}
-
- @Test
- public void testBadExtraProcessContextType() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "Integer is not a valid context parameter for method "
- + getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)"
- + ". Should be one of [BoundedWindow]");
-
- DoFnReflector.verifyProcessMethodArguments(
- getClass().getDeclaredMethod("badExtraProcessContext",
- ProcessContext.class, Integer.class));
- }
-
- @SuppressWarnings("unused")
- private int badReturnType() {
- return 0;
- }
-
- @Test
- public void testBadReturnType() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void return type");
-
- DoFnReflector.verifyProcessMethodArguments(getClass().getDeclaredMethod("badReturnType"));
- }
-
- @SuppressWarnings("unused")
- private void goodGenerics(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<Integer> input,
- DoFn.OutputReceiver<String> output) {}
-
- @Test
- public void testValidGenerics() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "goodGenerics",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private void goodWildcards(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<?> input,
- DoFn.OutputReceiver<?> output) {}
-
- @Test
- public void testGoodWildcards() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "goodWildcards",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private void goodBoundedWildcards(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<? super Integer> input,
- DoFn.OutputReceiver<? super String> output) {}
-
- @Test
- public void testGoodBoundedWildcards() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "goodBoundedWildcards",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private <InputT, OutputT> void goodTypeVariables(
- DoFn<InputT, OutputT>.ProcessContext c,
- DoFn.InputProvider<InputT> input,
- DoFn.OutputReceiver<OutputT> output) {}
-
- @Test
- public void testGoodTypeVariables() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "goodTypeVariables",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private void badGenericTwoArgs(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<Integer> input,
- DoFn.OutputReceiver<Integer> output) {}
-
- @Test
- public void testBadGenericsTwoArgs() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "badGenericTwoArgs",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Incompatible generics in context parameter "
- + "OutputReceiver<Integer> "
- + "for method " + getClass().getName()
- + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be "
- + "OutputReceiver<String>");
-
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private void badGenericWildCards(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<Integer> input,
- DoFn.OutputReceiver<? super Integer> output) {}
-
- @Test
- public void testBadGenericWildCards() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "badGenericWildCards",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Incompatible generics in context parameter "
- + "OutputReceiver<? super Integer> for method "
- + getClass().getName()
- + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be "
- + "OutputReceiver<String>");
-
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c,
- DoFn.InputProvider<InputT> input, DoFn.OutputReceiver<InputT> output) {}
-
- @Test
- public void testBadTypeVariables() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "badTypeVariables",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Incompatible generics in context parameter "
- + "OutputReceiver<InputT> for method " + getClass().getName()
- + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be "
- + "OutputReceiver<OutputT>");
-
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @Test
- public void testProcessElementException() throws Exception {
- DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {
- throw new IllegalArgumentException("bogus");
- }
- };
-
- thrown.expect(UserCodeException.class);
- thrown.expectMessage("bogus");
- DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeProcessElement(null, null);
- }
-
- @Test
- public void testStartBundleException() throws Exception {
- DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
- @StartBundle
- public void startBundle(@SuppressWarnings("unused") Context c) {
- throw new IllegalArgumentException("bogus");
- }
-
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {
- }
- };
-
- thrown.expect(UserCodeException.class);
- thrown.expectMessage("bogus");
- DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeStartBundle(null, null);
- }
-
- @Test
- public void testFinishBundleException() throws Exception {
- DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
- @FinishBundle
- public void finishBundle(@SuppressWarnings("unused") Context c) {
- throw new IllegalArgumentException("bogus");
- }
-
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {
- }
- };
-
- thrown.expect(UserCodeException.class);
- thrown.expectMessage("bogus");
- DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeFinishBundle(null, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 604536b..3469223 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -99,7 +99,7 @@ public class FlattenTest implements Serializable {
PCollection<String> output =
makePCollectionListOfStrings(p, inputs)
.apply(Flatten.<String>pCollections())
- .apply(ParDo.of(new IdentityFn<String>(){}));
+ .apply(ParDo.of(new IdentityFn<String>()));
PAssert.that(output).containsInAnyOrder(flattenLists(inputs));
p.run();
@@ -152,7 +152,7 @@ public class FlattenTest implements Serializable {
PCollection<String> output =
PCollectionList.<String>empty(p)
.apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
- .apply(ParDo.of(new IdentityFn<String>(){}));
+ .apply(ParDo.of(new IdentityFn<String>()));
PAssert.that(output).empty();
p.run();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
deleted file mode 100644
index 90fba12..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.dofnreflector;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnReflectorTest.Invocations;
-
-/**
- * Test helper for DoFnReflectorTest, which needs to test package-private access
- * to DoFns in other packages.
- */
-public class DoFnReflectorTestHelper {
-
- private static class StaticPrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public StaticPrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- private class InnerPrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public InnerPrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- static class StaticPackagePrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public StaticPackagePrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- class InnerPackagePrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public InnerPackagePrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- public static DoFn<String, String> newStaticPackagePrivateDoFn(
- Invocations invocations) {
- return new StaticPackagePrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
- return new InnerPackagePrivateDoFn(invocations);
- }
-
- public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
- return new StaticPrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
- return new InnerPrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
- return new DoFn<String, String>() {
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- };
- }
-
- public static DoFn<String, String> newStaticAnonymousDoFn(
- final Invocations invocations) {
- return new DoFn<String, String>() {
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
new file mode 100644
index 0000000..7e756e2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.UserCodeException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DoFnInvokers}. */
+public class DoFnInvokersTest {
+ /** A convenience struct holding flags that indicate whether a particular method was invoked. */
+ public static class Invocations {
+ public boolean wasProcessElementInvoked = false;
+ public boolean wasStartBundleInvoked = false;
+ public boolean wasFinishBundleInvoked = false;
+ public boolean wasSetupInvoked = false;
+ public boolean wasTeardownInvoked = false;
+ private final String name;
+
+ public Invocations(String name) {
+ this.name = name;
+ }
+ }
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Mock private DoFn.ProcessContext mockContext;
+ @Mock private BoundedWindow mockWindow;
+ @Mock private DoFn.InputProvider<String> mockInputProvider;
+ @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
+
+ private DoFn.ExtraContextFactory<String, String> extraContextFactory;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ this.extraContextFactory =
+ new DoFn.ExtraContextFactory<String, String>() {
+ @Override
+ public BoundedWindow window() {
+ return mockWindow;
+ }
+
+ @Override
+ public DoFn.InputProvider<String> inputProvider() {
+ return mockInputProvider;
+ }
+
+ @Override
+ public DoFn.OutputReceiver<String> outputReceiver() {
+ return mockOutputReceiver;
+ }
+ };
+ }
+
+ private void checkInvokeProcessElementWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called processElement on " + invocation.name,
+ invocation.wasProcessElementInvoked);
+ }
+ DoFnInvokers.INSTANCE
+ .newByteBuddyInvoker(fn)
+ .invokeProcessElement(mockContext, extraContextFactory);
+ for (Invocations invocation : invocations) {
+ assertTrue(
+ "Should have called processElement on " + invocation.name,
+ invocation.wasProcessElementInvoked);
+ }
+ }
+
+ private void checkInvokeStartBundleWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called startBundle on " + invocation.name,
+ invocation.wasStartBundleInvoked);
+ }
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(mockContext);
+ for (Invocations invocation : invocations) {
+ assertTrue(
+ "Should have called startBundle on " + invocation.name, invocation.wasStartBundleInvoked);
+ }
+ }
+
+ private void checkInvokeFinishBundleWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called finishBundle on " + invocation.name,
+ invocation.wasFinishBundleInvoked);
+ }
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(mockContext);
+ for (Invocations invocation : invocations) {
+ assertTrue(
+ "Should have called finishBundle on " + invocation.name,
+ invocation.wasFinishBundleInvoked);
+ }
+ }
+
+ private void checkInvokeSetupWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called setup on " + invocation.name, invocation.wasSetupInvoked);
+ }
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeSetup();
+ for (Invocations invocation : invocations) {
+ assertTrue("Should have called setup on " + invocation.name, invocation.wasSetupInvoked);
+ }
+ }
+
+ private void checkInvokeTeardownWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called teardown on " + invocation.name,
+ invocation.wasTeardownInvoked);
+ }
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeTeardown();
+ for (Invocations invocation : invocations) {
+ assertTrue(
+ "Should have called teardown on " + invocation.name, invocation.wasTeardownInvoked);
+ }
+ }
+
+ @Test
+ public void testDoFnWithNoExtraContext() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ }
+ };
+
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+
+ checkInvokeProcessElementWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnInvokersReused() throws Exception {
+ // Ensures that we don't create a new Invoker class for every instance of the DoFn.
+ IdentityParent fn1 = new IdentityParent();
+ IdentityParent fn2 = new IdentityParent();
+ assertSame(
+ "Invoker classes should only be generated once for each type",
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn1).getClass(),
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass());
+ }
+
+ interface InterfaceWithProcessElement {
+ @DoFn.ProcessElement
+ void processElement(DoFn<String, String>.ProcessContext c);
+ }
+
+ interface LayersOfInterfaces extends InterfaceWithProcessElement {}
+
+ private class IdentityUsingInterfaceWithProcessElement extends DoFn<String, String>
+ implements LayersOfInterfaces {
+
+ private Invocations invocations = new Invocations("Named Class");
+
+ @Override
+ public void processElement(DoFn<String, String>.ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ }
+ }
+
+ @Test
+ public void testDoFnWithProcessElementInterface() throws Exception {
+ IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+ checkInvokeProcessElementWorks(fn, fn.invocations);
+ }
+
+ private class IdentityParent extends DoFn<String, String> {
+ protected Invocations parentInvocations = new Invocations("IdentityParent");
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ parentInvocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ }
+ }
+
+ private class IdentityChildWithoutOverride extends IdentityParent {}
+
+ private class IdentityChildWithOverride extends IdentityParent {
+ protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
+
+ @Override
+ public void process(DoFn<String, String>.ProcessContext c) {
+ super.process(c);
+ childInvocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ @Test
+ public void testDoFnWithMethodInSuperclass() throws Exception {
+ IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+ checkInvokeProcessElementWorks(fn, fn.parentInvocations);
+ }
+
+ @Test
+ public void testDoFnWithMethodInSubclass() throws Exception {
+ IdentityChildWithOverride fn = new IdentityChildWithOverride();
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+ checkInvokeProcessElementWorks(fn, fn.parentInvocations, fn.childInvocations);
+ }
+
+ @Test
+ public void testDoFnWithWindow() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow w) throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ assertSame(w, mockWindow);
+ }
+ };
+
+ assertTrue(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+
+ checkInvokeProcessElementWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnWithOutputReceiver() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ assertSame(o, mockOutputReceiver);
+ }
+ };
+
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+
+ checkInvokeProcessElementWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnWithInputProvider() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c, InputProvider<String> i) throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ assertSame(i, mockInputProvider);
+ }
+ };
+
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+
+ checkInvokeProcessElementWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnWithStartBundle() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+
+ @StartBundle
+ public void startBundle(Context c) {
+ invocations.wasStartBundleInvoked = true;
+ assertSame(c, mockContext);
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) {
+ invocations.wasFinishBundleInvoked = true;
+ assertSame(c, mockContext);
+ }
+ };
+
+ checkInvokeStartBundleWorks(fn, invocations);
+ checkInvokeFinishBundleWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnWithSetupTeardown() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+
+ @StartBundle
+ public void startBundle(Context c) {
+ invocations.wasStartBundleInvoked = true;
+ assertSame(c, mockContext);
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) {
+ invocations.wasFinishBundleInvoked = true;
+ assertSame(c, mockContext);
+ }
+
+ @Setup
+ public void before() {
+ invocations.wasSetupInvoked = true;
+ }
+
+ @Teardown
+ public void after() {
+ invocations.wasTeardownInvoked = true;
+ }
+ };
+
+ checkInvokeSetupWorks(fn, invocations);
+ checkInvokeTeardownWorks(fn, invocations);
+ }
+
+ private static class PrivateDoFnClass extends DoFn<String, String> {
+ final Invocations invocations = new Invocations(getClass().getName());
+
+ @ProcessElement
+ public void processThis(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ @Test
+ public void testLocalPrivateDoFnClass() throws Exception {
+ PrivateDoFnClass fn = new PrivateDoFnClass();
+ checkInvokeProcessElementWorks(fn, fn.invocations);
+ }
+
+ @Test
+ public void testStaticPackagePrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
+ checkInvokeProcessElementWorks(
+ DoFnInvokersTestHelper.newStaticPackagePrivateDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testInnerPackagePrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
+ checkInvokeProcessElementWorks(
+ new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testStaticPrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticPrivateDoFn");
+ checkInvokeProcessElementWorks(
+ DoFnInvokersTestHelper.newStaticPrivateDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testInnerPrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticInnerDoFn");
+ checkInvokeProcessElementWorks(
+ new DoFnInvokersTestHelper().newInnerPrivateDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
+ Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
+ checkInvokeProcessElementWorks(
+ new DoFnInvokersTestHelper().newInnerAnonymousDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
+ Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
+ checkInvokeProcessElementWorks(
+ DoFnInvokersTestHelper.newStaticAnonymousDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testProcessElementException() throws Exception {
+ DoFn<Integer, Integer> fn =
+ new DoFn<Integer, Integer>() {
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {
+ throw new IllegalArgumentException("bogus");
+ }
+ };
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectMessage("bogus");
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeProcessElement(null, null);
+ }
+
+ @Test
+ public void testStartBundleException() throws Exception {
+ DoFn<Integer, Integer> fn =
+ new DoFn<Integer, Integer>() {
+ @StartBundle
+ public void startBundle(@SuppressWarnings("unused") Context c) {
+ throw new IllegalArgumentException("bogus");
+ }
+
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+ };
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectMessage("bogus");
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(null);
+ }
+
+ @Test
+ public void testFinishBundleException() throws Exception {
+ DoFn<Integer, Integer> fn =
+ new DoFn<Integer, Integer>() {
+ @FinishBundle
+ public void finishBundle(@SuppressWarnings("unused") Context c) {
+ throw new IllegalArgumentException("bogus");
+ }
+
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+ };
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectMessage("bogus");
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
new file mode 100644
index 0000000..7bfdddc
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest.Invocations;
+
+/**
+ * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access
+ * to DoFns in other packages.
+ */
+public class DoFnInvokersTestHelper {
+
+ private static class StaticPrivateDoFn extends DoFn<String, String> {
+ final Invocations invocations;
+
+ public StaticPrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ private class InnerPrivateDoFn extends DoFn<String, String> {
+ final Invocations invocations;
+
+ public InnerPrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ static class StaticPackagePrivateDoFn extends DoFn<String, String> {
+ final Invocations invocations;
+
+ public StaticPackagePrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ class InnerPackagePrivateDoFn extends DoFn<String, String> {
+ final Invocations invocations;
+
+ public InnerPackagePrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ public static DoFn<String, String> newStaticPackagePrivateDoFn(
+ Invocations invocations) {
+ return new StaticPackagePrivateDoFn(invocations);
+ }
+
+ public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
+ return new InnerPackagePrivateDoFn(invocations);
+ }
+
+ public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
+ return new StaticPrivateDoFn(invocations);
+ }
+
+ public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
+ return new InnerPrivateDoFn(invocations);
+ }
+
+ public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
+ return new DoFn<String, String>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ };
+ }
+
+ public static DoFn<String, String> newStaticAnonymousDoFn(
+ final Invocations invocations) {
+ return new DoFn<String, String>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ };
+ }
+}
[4/4] incubator-beam git commit: Closes #812
Posted by bc...@apache.org.
Closes #812
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89367cfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89367cfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89367cfb
Branch: refs/heads/master
Commit: 89367cfb19ae86d66441970277177512961d3b6a
Parents: 4609773 fbf77f9
Author: bchambers <bc...@google.com>
Authored: Wed Aug 17 15:43:47 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 17 15:43:47 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/DoFn.java | 17 +-
.../beam/sdk/transforms/DoFnAdapters.java | 281 +++++
.../beam/sdk/transforms/DoFnReflector.java | 1150 ------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 6 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 61 +
.../sdk/transforms/reflect/DoFnInvokers.java | 506 ++++++++
.../sdk/transforms/reflect/DoFnSignature.java | 113 ++
.../sdk/transforms/reflect/DoFnSignatures.java | 321 +++++
.../sdk/transforms/reflect/package-info.java | 23 +
.../beam/sdk/transforms/DoFnReflectorTest.java | 822 -------------
.../apache/beam/sdk/transforms/FlattenTest.java | 4 +-
.../dofnreflector/DoFnReflectorTestHelper.java | 116 --
.../transforms/reflect/DoFnInvokersTest.java | 498 ++++++++
.../reflect/DoFnInvokersTestHelper.java | 116 ++
.../transforms/reflect/DoFnSignaturesTest.java | 371 ++++++
.../transforms/DoFnInvokersBenchmark.java | 224 ++++
.../transforms/DoFnReflectorBenchmark.java | 232 ----
18 files changed, 2529 insertions(+), 2334 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-beam git commit: Rewrites DoFnReflector to go via
DoFnSignature
Posted by bc...@apache.org.
Rewrites DoFnReflector to go via DoFnSignature
DoFnSignature encapsulates type information about a DoFn,
in particular which arguments/features its methods
actually use.
Before this commit, DoFnReflector would parse/verify/generate
code in one go; after this commit, these stages are separated:
DoFnSignature encapsulates all information needed to generate
the code.
Additionally, removes the unnecessary genericity in the
implementation of DoFnReflector's code generation for the
very different methods processElement and start/finishBundle.
The code is simpler if decomposed into utility functions,
rather than attempting a uniform representation for different
methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fbf77f90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fbf77f90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fbf77f90
Branch: refs/heads/master
Commit: fbf77f90e0391304a580178f99441256526c4b0e
Parents: 4609773
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 9 17:16:00 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 17 15:43:46 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/DoFn.java | 17 +-
.../beam/sdk/transforms/DoFnAdapters.java | 281 +++++
.../beam/sdk/transforms/DoFnReflector.java | 1150 ------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 6 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 61 +
.../sdk/transforms/reflect/DoFnInvokers.java | 506 ++++++++
.../sdk/transforms/reflect/DoFnSignature.java | 113 ++
.../sdk/transforms/reflect/DoFnSignatures.java | 321 +++++
.../sdk/transforms/reflect/package-info.java | 23 +
.../beam/sdk/transforms/DoFnReflectorTest.java | 822 -------------
.../apache/beam/sdk/transforms/FlattenTest.java | 4 +-
.../dofnreflector/DoFnReflectorTestHelper.java | 116 --
.../transforms/reflect/DoFnInvokersTest.java | 498 ++++++++
.../reflect/DoFnInvokersTestHelper.java | 116 ++
.../transforms/reflect/DoFnSignaturesTest.java | 371 ++++++
.../transforms/DoFnInvokersBenchmark.java | 224 ++++
.../transforms/DoFnReflectorBenchmark.java | 232 ----
18 files changed, 2529 insertions(+), 2334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 80b67af..2348783 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollectionView;
@@ -247,7 +248,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/////////////////////////////////////////////////////////////////////////////
- Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
+ protected Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
/**
* Protects aggregators from being created after initialization.
@@ -283,7 +284,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/**
* Interface for runner implementors to provide implementations of extra context information.
*
- * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
+ * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking an
* annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
* has indicated it needs the given extra context.
*
@@ -301,23 +302,23 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
BoundedWindow window();
/**
- * A placeholder for testing purposes. The return type itself is package-private and not
- * implemented.
+ * A placeholder for testing purposes.
*/
InputProvider<InputT> inputProvider();
/**
- * A placeholder for testing purposes. The return type itself is package-private and not
- * implemented.
+ * A placeholder for testing purposes.
*/
OutputReceiver<OutputT> outputReceiver();
}
- static interface OutputReceiver<T> {
+ /** A placeholder for testing handling of output types during {@link DoFn} reflection. */
+ public interface OutputReceiver<T> {
void output(T output);
}
- static interface InputProvider<T> {
+ /** A placeholder for testing handling of input types during {@link DoFn} reflection. */
+ public interface InputProvider<T> {
T get();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
new file mode 100644
index 0000000..71a148f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+
+/**
+ * Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}.
+ *
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using
+ * {@link DoFnInvoker}) rather than via {@link OldDoFn}.
+ */
+@Deprecated
+public class DoFnAdapters {
+ /** Should not be instantiated. */
+ private DoFnAdapters() {}
+
+ /**
+ * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the
+ * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
+ */
+ public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
+ if (fn instanceof SimpleDoFnAdapter) {
+ return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
+ } else {
+ return fn.getClass();
+ }
+ }
+
+ /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+ public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
+ DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+ if (signature.processElement().usesSingleWindow()) {
+ return new WindowDoFnAdapter<>(fn);
+ } else {
+ return new SimpleDoFnAdapter<>(fn);
+ }
+ }
+
+ /**
+ * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
+ * OldDoFn}.
+ */
+ private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+ private final DoFn<InputT, OutputT> fn;
+ private transient DoFnInvoker<InputT, OutputT> invoker;
+
+ SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
+ super(fn.aggregators);
+ this.fn = fn;
+ this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ }
+
+ @Override
+ public void setup() throws Exception {
+ this.invoker.invokeSetup();
+ }
+
+ @Override
+ public void startBundle(Context c) throws Exception {
+ this.fn.prepareForProcessing();
+ invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
+ }
+
+ @Override
+ public void finishBundle(Context c) throws Exception {
+ invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ this.invoker.invokeTeardown();
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
+ invoker.invokeProcessElement(adapter, adapter);
+ }
+
+ @Override
+ protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return fn.getInputTypeDescriptor();
+ }
+
+ @Override
+ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return fn.getOutputTypeDescriptor();
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return fn.getAllowedTimestampSkew();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.include(fn);
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ }
+ }
+
+ /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
+ private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
+ implements OldDoFn.RequiresWindowAccess {
+
+ WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
+ super(fn);
+ }
+ }
+
+ /**
+ * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ExtraContextFactory} inside a {@link
+ * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
+ * unavailable.
+ */
+ private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
+ implements DoFn.ExtraContextFactory<InputT, OutputT> {
+
+ private OldDoFn<InputT, OutputT>.Context context;
+
+ private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+ fn.super();
+ this.context = context;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ context.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ context.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ public BoundedWindow window() {
+ // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
+ // should be unreachable.
+ throw new UnsupportedOperationException("Can only get the window in ProcessElements");
+ }
+
+ @Override
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("inputProvider() exists only for testing");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+ }
+ }
+
+ /**
+ * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ExtraContextFactory} inside a {@link
+ * DoFn.ProcessElement} method.
+ */
+ private static class ProcessContextAdapter<InputT, OutputT>
+ extends DoFn<InputT, OutputT>.ProcessContext
+ implements DoFn.ExtraContextFactory<InputT, OutputT> {
+
+ private OldDoFn<InputT, OutputT>.ProcessContext context;
+
+ private ProcessContextAdapter(
+ DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
+ fn.super();
+ this.context = context;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return context.sideInput(view);
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ context.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ context.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ public InputT element() {
+ return context.element();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return context.timestamp();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return context.pane();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return context.window();
+ }
+
+ @Override
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("inputProvider() exists only for testing");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
deleted file mode 100644
index bf04041..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ /dev/null
@@ -1,1150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.StartBundle;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.reflect.TypeParameter;
-import com.google.common.reflect.TypeToken;
-
-import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.NamingStrategy.SuffixingRandom;
-import net.bytebuddy.description.field.FieldDescription;
-import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.description.method.ParameterList;
-import net.bytebuddy.description.modifier.FieldManifestation;
-import net.bytebuddy.description.modifier.Visibility;
-import net.bytebuddy.description.type.TypeDescription;
-import net.bytebuddy.description.type.TypeDescription.Generic;
-import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
-import net.bytebuddy.dynamic.scaffold.InstrumentedType;
-import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy.Default;
-import net.bytebuddy.implementation.Implementation;
-import net.bytebuddy.implementation.MethodCall.MethodLocator;
-import net.bytebuddy.implementation.StubMethod;
-import net.bytebuddy.implementation.bind.MethodDelegationBinder.MethodInvoker;
-import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder.TerminationHandler;
-import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
-import net.bytebuddy.implementation.bytecode.Duplication;
-import net.bytebuddy.implementation.bytecode.StackManipulation;
-import net.bytebuddy.implementation.bytecode.Throw;
-import net.bytebuddy.implementation.bytecode.assign.Assigner;
-import net.bytebuddy.implementation.bytecode.member.FieldAccess;
-import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
-import net.bytebuddy.implementation.bytecode.member.MethodReturn;
-import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
-import net.bytebuddy.jar.asm.Label;
-import net.bytebuddy.jar.asm.MethodVisitor;
-import net.bytebuddy.jar.asm.Opcodes;
-import net.bytebuddy.matcher.ElementMatchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-
-/**
- * Utility implementing the necessary reflection for working with {@link DoFn}s.
- */
-public abstract class DoFnReflector {
-
- private static final String FN_DELEGATE_FIELD_NAME = "delegate";
-
- private enum Availability {
- /** Indicates parameters only available in {@code @ProcessElement} methods. */
- PROCESS_ELEMENT_ONLY,
- /** Indicates parameters available in all methods. */
- EVERYWHERE
- }
-
- /**
- * Enumeration of the parameters available from the {@link ExtraContextFactory} to use as
- * additional parameters for {@link DoFn} methods.
- * <p>
- * We don't rely on looking for properly annotated methods within {@link ExtraContextFactory}
- * because erasure would make it impossible to completely fill in the type token for context
- * parameters that depend on the input/output type.
- */
- private enum AdditionalParameter {
-
- /** Any {@link BoundedWindow} parameter is populated by the window of the current element. */
- WINDOW_OF_ELEMENT(Availability.PROCESS_ELEMENT_ONLY, BoundedWindow.class, "window") {
- @Override
- public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
- return TypeToken.of(BoundedWindow.class);
- }
- },
-
- INPUT_PROVIDER(Availability.PROCESS_ELEMENT_ONLY, DoFn.InputProvider.class, "inputProvider") {
- @Override
- public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
- return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
- new TypeParameter<InputT>() {}, in);
- }
-
- @Override
- public boolean isHidden() {
- return true;
- }
- },
-
- OUTPUT_RECEIVER(
- Availability.PROCESS_ELEMENT_ONLY, DoFn.OutputReceiver.class, "outputReceiver") {
- @Override
- public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
- return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
- new TypeParameter<OutputT>() {}, out);
- }
-
- @Override
- public boolean isHidden() {
- return true;
- }
- };
-
- /**
- * Create a type token representing the given parameter. May use the type token associated
- * with the input and output types of the {@link DoFn}, depending on the extra
- * context.
- */
- abstract <InputT, OutputT> TypeToken<?> tokenFor(
- TypeToken<InputT> in, TypeToken<OutputT> out);
-
- /**
- * Indicates whether this enum is for testing only, hence should not appear in error messages,
- * etc. Defaults to {@code false}.
- */
- boolean isHidden() {
- return false;
- }
-
- private final Class<?> rawType;
- private final Availability availability;
- private final transient MethodDescription method;
-
- private AdditionalParameter(Availability availability, Class<?> rawType, String method) {
- this.availability = availability;
- this.rawType = rawType;
- try {
- this.method = new MethodDescription.ForLoadedMethod(
- ExtraContextFactory.class.getMethod(method));
- } catch (NoSuchMethodException | SecurityException e) {
- throw new RuntimeException(
- "Unable to access method " + method + " on " + ExtraContextFactory.class, e);
- }
- }
- }
-
- private static final Map<Class<?>, AdditionalParameter> EXTRA_CONTEXTS;
- private static final Map<Class<?>, AdditionalParameter> EXTRA_PROCESS_CONTEXTS;
-
- static {
- ImmutableMap.Builder<Class<?>, AdditionalParameter> everywhereBuilder =
- ImmutableMap.<Class<?>, AdditionalParameter>builder();
- ImmutableMap.Builder<Class<?>, AdditionalParameter> processElementBuilder =
- ImmutableMap.<Class<?>, AdditionalParameter>builder();
-
- for (AdditionalParameter value : AdditionalParameter.values()) {
- switch (value.availability) {
- case EVERYWHERE:
- everywhereBuilder.put(value.rawType, value);
- break;
- case PROCESS_ELEMENT_ONLY:
- processElementBuilder.put(value.rawType, value);
- break;
- }
- }
-
- EXTRA_CONTEXTS = everywhereBuilder.build();
- EXTRA_PROCESS_CONTEXTS = processElementBuilder
- // Process Element contexts include everything available everywhere
- .putAll(EXTRA_CONTEXTS)
- .build();
- }
-
- /**
- * @return true if the reflected {@link DoFn} uses a Single Window.
- */
- public abstract boolean usesSingleWindow();
-
- /** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */
- public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
- DoFn<InputT, OutputT> fn);
-
- private static final Map<Class<?>, DoFnReflector> REFLECTOR_CACHE =
- new LinkedHashMap<Class<?>, DoFnReflector>();
-
- /**
- * @return the {@link DoFnReflector} for the given {@link DoFn}.
- */
- public static DoFnReflector of(
- @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
- DoFnReflector reflector = REFLECTOR_CACHE.get(fn);
- if (reflector != null) {
- return reflector;
- }
-
- reflector = new GenericDoFnReflector(fn);
- REFLECTOR_CACHE.put(fn, reflector);
- return reflector;
- }
-
- /**
- * Create a {@link OldDoFn} that the {@link DoFn}.
- */
- public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFn<InputT, OutputT> fn) {
- if (usesSingleWindow()) {
- return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
- } else {
- return new SimpleDoFnAdapter<InputT, OutputT>(this, fn);
- }
- }
-
- private static String formatType(TypeToken<?> t) {
- return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
- }
-
- private static String format(Method m) {
- return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
- }
-
- private static Collection<String> describeSupportedTypes(
- Map<Class<?>, AdditionalParameter> extraProcessContexts,
- final TypeToken<?> in, final TypeToken<?> out) {
- return FluentIterable
- .from(extraProcessContexts.values())
- .filter(new Predicate<AdditionalParameter>() {
- @Override
- public boolean apply(@Nonnull AdditionalParameter additionalParameter) {
- return !additionalParameter.isHidden();
- }
- })
- .transform(new Function<AdditionalParameter, String>() {
- @Override
- @Nonnull
- public String apply(@Nonnull AdditionalParameter input) {
- return formatType(input.tokenFor(in, out));
- }
- })
- .toSortedSet(String.CASE_INSENSITIVE_ORDER);
- }
-
- @VisibleForTesting
- static <InputT, OutputT> List<AdditionalParameter> verifyProcessMethodArguments(Method m) {
- return verifyMethodArguments(m,
- EXTRA_PROCESS_CONTEXTS,
- new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {},
- new TypeParameter<InputT>() {},
- new TypeParameter<OutputT>() {});
- }
-
- @VisibleForTesting
- static <InputT, OutputT> List<AdditionalParameter> verifyBundleMethodArguments(Method m) {
- if (m == null) {
- return null;
- }
- return verifyMethodArguments(m,
- EXTRA_CONTEXTS,
- new TypeToken<DoFn<InputT, OutputT>.Context>() {},
- new TypeParameter<InputT>() {},
- new TypeParameter<OutputT>() {});
- }
-
- @VisibleForTesting
- static void verifyLifecycleMethodArguments(Method m) {
- if (m == null) {
- return;
- }
- checkState(void.class.equals(m.getReturnType()), "%s must have void return type", format(m));
- checkState(m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
- }
-
- /**
- * Verify the method arguments for a given {@link DoFn} method.
- *
- * <p>The requirements for a method to be valid, are:
- * <ol>
- * <li>The method has at least one argument.
- * <li>The first argument is of type firstContextArg.
- * <li>The remaining arguments have raw types that appear in {@code contexts}
- * <li>Any generics on the extra context arguments match what is expected. Currently, this
- * is exercised only by placeholders. For example, {@code InputReceiver<InputT> must either match
- * the {@code InputT} {@code OldDoFn<InputT, OutputT>.ProcessContext} or use a wildcard, etc.
- * </ol>
- *
- * @param m the method to verify
- * @param contexts mapping from raw classes to the {@link AdditionalParameter} used
- * to create new instances.
- * @param firstContextArg the expected type of the first context argument
- * @param iParam TypeParameter representing the input type
- * @param oParam TypeParameter representing the output type
- */
- @VisibleForTesting
- static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments(
- Method m,
- Map<Class<?>, AdditionalParameter> contexts,
- TypeToken<?> firstContextArg,
- TypeParameter<InputT> iParam,
- TypeParameter<OutputT> oParam) {
-
- if (!void.class.equals(m.getReturnType())) {
- throw new IllegalStateException(String.format(
- "%s must have a void return type", format(m)));
- }
- if (m.isVarArgs()) {
- throw new IllegalStateException(String.format(
- "%s must not have var args", format(m)));
- }
-
- // The first parameter must be present, and must be the specified type
- Type[] params = m.getGenericParameterTypes();
- TypeToken<?> contextToken = null;
- if (params.length > 0) {
- contextToken = TypeToken.of(params[0]);
- }
- if (contextToken == null
- || !contextToken.getRawType().equals(firstContextArg.getRawType())) {
- throw new IllegalStateException(String.format(
- "%s must take a %s as its first argument",
- format(m), firstContextArg.getRawType().getSimpleName()));
- }
- AdditionalParameter[] contextInfos = new AdditionalParameter[params.length - 1];
-
- // Fill in the generics in the allExtraContextArgs interface from the types in the
- // Context or ProcessContext OldDoFn.
- ParameterizedType pt = (ParameterizedType) contextToken.getType();
- // We actually want the owner, since ProcessContext and Context are owned by DoFn.
- pt = (ParameterizedType) pt.getOwnerType();
- @SuppressWarnings("unchecked")
- TypeToken<InputT> iActual = (TypeToken<InputT>) TypeToken.of(pt.getActualTypeArguments()[0]);
- @SuppressWarnings("unchecked")
- TypeToken<OutputT> oActual = (TypeToken<OutputT>) TypeToken.of(pt.getActualTypeArguments()[1]);
-
- // All of the remaining parameters must be a super-interface of allExtraContextArgs
- // that is not listed in the EXCLUDED_INTERFACES set.
- for (int i = 1; i < params.length; i++) {
- TypeToken<?> param = TypeToken.of(params[i]);
-
- AdditionalParameter info = contexts.get(param.getRawType());
- if (info == null) {
- throw new IllegalStateException(String.format(
- "%s is not a valid context parameter for method %s. Should be one of %s",
- formatType(param), format(m),
- describeSupportedTypes(contexts, iActual, oActual)));
- }
-
- // If we get here, the class matches, but maybe the generics don't:
- TypeToken<?> expected = info.tokenFor(iActual, oActual);
- if (!expected.isSubtypeOf(param)) {
- throw new IllegalStateException(String.format(
- "Incompatible generics in context parameter %s for method %s. Should be %s",
- formatType(param), format(m), formatType(info.tokenFor(iActual, oActual))));
- }
-
- // Register the (now validated) context info
- contextInfos[i - 1] = info;
- }
- return ImmutableList.copyOf(contextInfos);
- }
-
- /** Interface for invoking the {@code OldDoFn} processing methods. */
- public interface DoFnInvoker<InputT, OutputT> {
- /** Invoke {@link OldDoFn#setup} on the bound {@code OldDoFn}. */
- void invokeSetup();
- /** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */
- void invokeStartBundle(
- DoFn<InputT, OutputT>.Context c,
- ExtraContextFactory<InputT, OutputT> extra);
- /** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */
- void invokeFinishBundle(
- DoFn<InputT, OutputT>.Context c,
- ExtraContextFactory<InputT, OutputT> extra);
-
- /** Invoke {@link OldDoFn#teardown()} on the bound {@code DoFn}. */
- void invokeTeardown();
-
- /** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */
- public void invokeProcessElement(
- DoFn<InputT, OutputT>.ProcessContext c,
- ExtraContextFactory<InputT, OutputT> extra);
- }
-
- /**
- * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFn}.
- */
- private static class GenericDoFnReflector extends DoFnReflector {
-
- private final Method setup;
- private final Method startBundle;
- private final Method processElement;
- private final Method finishBundle;
- private final Method teardown;
- private final List<AdditionalParameter> processElementArgs;
- private final List<AdditionalParameter> startBundleArgs;
- private final List<AdditionalParameter> finishBundleArgs;
- private final Constructor<?> constructor;
-
- private GenericDoFnReflector(
- @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
- // Locate the annotated methods
- this.processElement = findAnnotatedMethod(ProcessElement.class, fn, true);
- this.setup = findAnnotatedMethod(Setup.class, fn, false);
- this.startBundle = findAnnotatedMethod(StartBundle.class, fn, false);
- this.finishBundle = findAnnotatedMethod(FinishBundle.class, fn, false);
- this.teardown = findAnnotatedMethod(Teardown.class, fn, false);
-
- // Verify that their method arguments satisfy our conditions.
- this.processElementArgs = verifyProcessMethodArguments(processElement);
- this.startBundleArgs = verifyBundleMethodArguments(startBundle);
- this.finishBundleArgs = verifyBundleMethodArguments(finishBundle);
- verifyLifecycleMethodArguments(setup);
- verifyLifecycleMethodArguments(teardown);
-
- this.constructor = createInvokerConstructor(fn);
- }
-
- private static Collection<Method> declaredMethodsWithAnnotation(
- Class<? extends Annotation> anno,
- Class<?> startClass, Class<?> stopClass) {
- Collection<Method> matches = new ArrayList<>();
-
- Class<?> clazz = startClass;
- LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
-
- // First, find all declared methods on the startClass and parents (up to stopClass)
- while (clazz != null && !clazz.equals(stopClass)) {
- for (Method method : clazz.getDeclaredMethods()) {
- if (method.isAnnotationPresent(anno)) {
- matches.add(method);
- }
- }
-
- Collections.addAll(interfaces, clazz.getInterfaces());
-
- clazz = clazz.getSuperclass();
- }
-
- // Now, iterate over all the discovered interfaces
- for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
- if (method.isAnnotationPresent(anno)) {
- matches.add(method);
- }
- }
- return matches;
- }
-
- private static Method findAnnotatedMethod(
- Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
- Collection<Method> matches = declaredMethodsWithAnnotation(
- anno, fnClazz, DoFn.class);
-
- if (matches.size() == 0) {
- if (required) {
- throw new IllegalStateException(String.format(
- "No method annotated with @%s found in %s",
- anno.getSimpleName(), fnClazz.getName()));
- } else {
- return null;
- }
- }
-
- // If we have at least one match, then either it should be the only match
- // or it should be an extension of the other matches (which came from parent
- // classes).
- Method first = matches.iterator().next();
- for (Method other : matches) {
- if (!first.getName().equals(other.getName())
- || !Arrays.equals(first.getParameterTypes(), other.getParameterTypes())) {
- throw new IllegalStateException(String.format(
- "Found multiple methods annotated with @%s. [%s] and [%s]",
- anno.getSimpleName(), format(first), format(other)));
- }
- }
-
- // We need to be able to call it. We require it is public.
- if ((first.getModifiers() & Modifier.PUBLIC) == 0) {
- throw new IllegalStateException(format(first) + " must be public");
- }
-
- // And make sure its not static.
- if ((first.getModifiers() & Modifier.STATIC) != 0) {
- throw new IllegalStateException(format(first) + " must not be static");
- }
-
- return first;
- }
-
- @Override
- public boolean usesSingleWindow() {
- return usesContext(AdditionalParameter.WINDOW_OF_ELEMENT);
- }
-
- private boolean usesContext(AdditionalParameter param) {
- return processElementArgs.contains(param)
- || (startBundleArgs != null && startBundleArgs.contains(param))
- || (finishBundleArgs != null && finishBundleArgs.contains(param));
- }
-
- /**
- * Use ByteBuddy to generate the code for a {@link DoFnInvoker} that invokes the given
- * {@link DoFn}.
- * @param clazz
- * @return
- */
- private Constructor<? extends DoFnInvoker<?, ?>> createInvokerConstructor(
- @SuppressWarnings("rawtypes") Class<? extends DoFn> clazz) {
-
- final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(clazz);
-
- DynamicType.Builder<?> builder = new ByteBuddy()
- // Create subclasses inside the target class, to have access to
- // private and package-private bits
- .with(new SuffixingRandom("auxiliary") {
- @Override
- public String subclass(Generic superClass) {
- return super.name(clazzDescription);
- }
- })
- // Create a subclass of DoFnInvoker
- .subclass(DoFnInvoker.class, Default.NO_CONSTRUCTORS)
- .defineField(FN_DELEGATE_FIELD_NAME, clazz, Visibility.PRIVATE, FieldManifestation.FINAL)
- // Define a constructor to populate fields appropriately.
- .defineConstructor(Visibility.PUBLIC)
- .withParameter(clazz)
- .intercept(new InvokerConstructor())
- // Implement the three methods by calling into the appropriate functions on the fn.
- .method(ElementMatchers.named("invokeProcessElement"))
- .intercept(InvokerDelegation.create(
- processElement, BeforeDelegation.NOOP, processElementArgs))
- .method(ElementMatchers.named("invokeStartBundle"))
- .intercept(InvokerDelegation.create(
- startBundle, BeforeDelegation.INVOKE_PREPARE_FOR_PROCESSING, startBundleArgs))
- .method(ElementMatchers.named("invokeFinishBundle"))
- .intercept(InvokerDelegation.create(finishBundle,
- BeforeDelegation.NOOP,
- finishBundleArgs))
- .method(ElementMatchers.named("invokeSetup"))
- .intercept(InvokerDelegation.create(setup,
- BeforeDelegation.NOOP,
- Collections.<AdditionalParameter>emptyList()))
- .method(ElementMatchers.named("invokeTeardown"))
- .intercept(InvokerDelegation.create(teardown,
- BeforeDelegation.NOOP,
- Collections.<AdditionalParameter>emptyList()));
-
- @SuppressWarnings("unchecked")
- Class<? extends DoFnInvoker<?, ?>> dynamicClass = (Class<? extends DoFnInvoker<?, ?>>) builder
- .make()
- .load(getClass().getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
- .getLoaded();
- try {
- return dynamicClass.getConstructor(clazz);
- } catch (IllegalArgumentException
- | NoSuchMethodException
- | SecurityException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
- DoFn<InputT, OutputT> fn) {
- try {
- @SuppressWarnings("unchecked")
- DoFnInvoker<InputT, OutputT> invoker =
- (DoFnInvoker<InputT, OutputT>) constructor.newInstance(fn);
- return invoker;
- } catch (InstantiationException
- | IllegalAccessException
- | IllegalArgumentException
- | InvocationTargetException
- | SecurityException e) {
- throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e);
- }
- }
- }
-
- private static class ContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.Context
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.Context context;
-
- private ContextAdapter(
- DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- public BoundedWindow window() {
- // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
- // should be unreachable.
- throw new UnsupportedOperationException("Can only get the window in ProcessElements");
- }
-
- @Override
- public DoFn.InputProvider<InputT> inputProvider() {
- throw new UnsupportedOperationException("inputProvider() exists only for testing");
- }
-
- @Override
- public DoFn.OutputReceiver<OutputT> outputReceiver() {
- throw new UnsupportedOperationException("outputReceiver() exists only for testing");
- }
- }
-
- private static class ProcessContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.ProcessContext context;
-
- private ProcessContextAdapter(
- DoFn<InputT, OutputT> fn,
- OldDoFn<InputT, OutputT>.ProcessContext context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return context.sideInput(view);
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- public InputT element() {
- return context.element();
- }
-
- @Override
- public Instant timestamp() {
- return context.timestamp();
- }
-
- @Override
- public PaneInfo pane() {
- return context.pane();
- }
-
- @Override
- public BoundedWindow window() {
- return context.window();
- }
-
- @Override
- public DoFn.InputProvider<InputT> inputProvider() {
- throw new UnsupportedOperationException("inputProvider() exists only for testing");
- }
-
- @Override
- public DoFn.OutputReceiver<OutputT> outputReceiver() {
- throw new UnsupportedOperationException("outputReceiver() exists only for testing");
- }
- }
-
- public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
- if (fn instanceof SimpleDoFnAdapter) {
- return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
- } else {
- return fn.getClass();
- }
- }
-
- private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
-
- private final DoFn<InputT, OutputT> fn;
- private transient DoFnInvoker<InputT, OutputT> invoker;
-
- private SimpleDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
- super(fn.aggregators);
- this.fn = fn;
- this.invoker = reflector.bindInvoker(fn);
- }
-
- @Override
- public void setup() throws Exception {
- invoker.invokeSetup();
- }
-
- @Override
- public void startBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
- ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
- invoker.invokeStartBundle(adapter, adapter);
- }
-
- @Override
- public void finishBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
- ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
- invoker.invokeFinishBundle(adapter, adapter);
- }
-
- @Override
- public void teardown() {
- invoker.invokeTeardown();
- }
-
- @Override
- public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
- ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
- invoker.invokeProcessElement(adapter, adapter);
- }
-
- @Override
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return fn.getInputTypeDescriptor();
- }
-
- @Override
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return fn.getOutputTypeDescriptor();
- }
-
- @Override
- public Duration getAllowedTimestampSkew() {
- return fn.getAllowedTimestampSkew();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(fn);
- }
-
- private void readObject(java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- invoker = DoFnReflector.of(fn.getClass()).bindInvoker(fn);
- }
- }
-
- private static class WindowDoFnAdapter<InputT, OutputT>
- extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess {
-
- private WindowDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
- super(reflector, fn);
- }
- }
-
- private static enum BeforeDelegation {
- NOOP {
- @Override
- StackManipulation manipulation(
- TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep) {
- Preconditions.checkArgument(!finalStep,
- "Shouldn't use NOOP delegation if there is nothing to do afterwards.");
- return StackManipulation.Trivial.INSTANCE;
- }
- },
- INVOKE_PREPARE_FOR_PROCESSING {
- private final Assigner assigner = Assigner.DEFAULT;
-
- @Override
- StackManipulation manipulation(
- TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep) {
- MethodDescription prepareMethod;
- try {
- prepareMethod = new MethodLocator.ForExplicitMethod(
- new MethodDescription.ForLoadedMethod(
- DoFn.class.getDeclaredMethod("prepareForProcessing")))
- .resolve(instrumentedMethod);
- } catch (NoSuchMethodException | SecurityException e) {
- throw new RuntimeException("Unable to locate prepareForProcessing method", e);
- }
-
- if (finalStep) {
- return new StackManipulation.Compound(
- // Invoke the prepare method
- MethodInvoker.Simple.INSTANCE.invoke(prepareMethod),
- // Return from the invokeStartBundle when we're done.
- TerminationHandler.Returning.INSTANCE.resolve(
- assigner, instrumentedMethod, prepareMethod));
- } else {
- return new StackManipulation.Compound(
- // Duplicate the delegation target so that it remains after we invoke prepare
- Duplication.duplicate(delegateType),
- // Invoke the prepare method
- MethodInvoker.Simple.INSTANCE.invoke(prepareMethod),
- // Drop the return value from prepareForProcessing
- TerminationHandler.Dropping.INSTANCE.resolve(
- assigner, instrumentedMethod, prepareMethod));
- }
- }
- };
-
- /**
- * Stack manipulation to perform prior to the delegate call.
- *
- * <ul>
- * <li>Precondition: Stack has the delegate target on top of the stack
- * <li>Postcondition: If finalStep is true, then we've returned from the method. Otherwise, the
- * stack still has the delegate target on top of the stack.
- * </ul>
- *
- * @param delegateType The type of the delegate target, in case it needs to be duplicated.
- * @param instrumentedMethod The method bing instrumented. Necessary for resolving types and
- * other information.
- * @param finalStep If true, return from the {@code invokeStartBundle} method after invoking
- * {@code prepareForProcessing} on the delegate.
- */
- abstract StackManipulation manipulation(
- TypeDescription delegateType, MethodDescription instrumentedMethod, boolean finalStep);
- }
-
- /**
- * A byte-buddy {@link Implementation} that delegates a call that receives
- * {@link AdditionalParameter} to the given {@link DoFn} method.
- */
- private static final class InvokerDelegation implements Implementation {
- @Nullable
- private final Method target;
- private final BeforeDelegation before;
- private final List<AdditionalParameter> args;
- private final Assigner assigner = Assigner.DEFAULT;
- private FieldDescription field;
-
- /**
- * Create the {@link InvokerDelegation} for the specified method.
- *
- * @param target the method to delegate to
- * @param isStartBundle whether or not this is the {@code startBundle} call
- * @param args the {@link AdditionalParameter} to be passed to the {@code target}
- */
- private InvokerDelegation(
- @Nullable Method target,
- BeforeDelegation before,
- List<AdditionalParameter> args) {
- this.target = target;
- this.before = before;
- this.args = args;
- }
-
- /**
- * Generate the {@link Implementation} of one of the life-cycle methods of a
- * {@link DoFn}.
- */
- private static Implementation create(
- @Nullable final Method target, BeforeDelegation before, List<AdditionalParameter> args) {
- if (target == null && before == BeforeDelegation.NOOP) {
- // There is no target to call and nothing needs to happen before. Just produce a stub.
- return StubMethod.INSTANCE;
- } else {
- // We need to generate a non-empty method implementation.
- return new InvokerDelegation(target, before, args);
- }
- }
-
- @Override
- public InstrumentedType prepare(InstrumentedType instrumentedType) {
- // Remember the field description of the instrumented type.
- field = instrumentedType.getDeclaredFields()
- .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)).getOnly();
-
- // Delegating the method call doesn't require any changes to the instrumented type.
- return instrumentedType;
- }
-
- /**
- * Stack manipulation to push the {@link DoFn} reference stored in the
- * delegate field of the invoker on to the top of the stack.
- *
- * <p>This implementation is derived from the code for
- * {@code MethodCall.invoke(m).onInstanceField(clazz, delegateField)} with two key differences.
- * First, it doesn't add a synthetic field each time, which is critical to avoid duplicate field
- * definitions. Second, it uses the {@link AdditionalParameter} to populate the arguments to the
- * method.
- */
- private StackManipulation pushDelegateField() {
- return new StackManipulation.Compound(
- // Push "this" reference to the stack
- MethodVariableAccess.REFERENCE.loadOffset(0),
- // Access the delegate field of the the invoker
- FieldAccess.forField(field).getter());
- }
-
- private StackManipulation pushArgument(
- AdditionalParameter arg, MethodDescription instrumentedMethod) {
- MethodDescription transform = arg.method;
-
- return new StackManipulation.Compound(
- // Push the ExtraContextFactory which must have been argument 2 of the instrumented method
- MethodVariableAccess.REFERENCE.loadOffset(2),
- // Invoke the appropriate method to produce the context argument
- MethodInvocation.invoke(transform));
- }
-
- private StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
- MethodDescription targetMethod = new MethodLocator.ForExplicitMethod(
- new MethodDescription.ForLoadedMethod(target)).resolve(instrumentedMethod);
- ParameterList<?> params = targetMethod.getParameters();
-
- List<StackManipulation> parameters;
- if (!params.isEmpty()) {
- // Instructions to setup the parameters for the call
- parameters = new ArrayList<>(args.size() + 1);
- // 1. The first argument in the delegate method must be the context. This corresponds to
- // the first argument in the instrumented method, so copy that.
- parameters.add(MethodVariableAccess.of(params.get(0).getType().getSuperClass())
- .loadOffset(1));
- // 2. For each of the extra arguments push the appropriate value.
- for (AdditionalParameter arg : args) {
- parameters.add(pushArgument(arg, instrumentedMethod));
- }
- } else {
- parameters = Collections.emptyList();
- }
-
- return new StackManipulation.Compound(
- // Push the parameters
- new StackManipulation.Compound(parameters),
- // Invoke the target method
- wrapWithUserCodeException(MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
- // Return from the instrumented method
- TerminationHandler.Returning.INSTANCE.resolve(
- assigner, instrumentedMethod, targetMethod));
- }
-
- /**
- * Wrap a given stack manipulation in a try catch block. Any exceptions thrown within the
- * try are wrapped with a {@link UserCodeException}.
- */
- private StackManipulation wrapWithUserCodeException(
- final StackManipulation tryBody) {
- final MethodDescription createUserCodeException;
- try {
- createUserCodeException = new MethodDescription.ForLoadedMethod(
- UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
- } catch (NoSuchMethodException | SecurityException e) {
- throw new RuntimeException("Unable to find UserCodeException.wrap", e);
- }
-
- return new StackManipulation() {
- @Override
- public boolean isValid() {
- return tryBody.isValid();
- }
-
- @Override
- public Size apply(MethodVisitor mv, Context implementationContext) {
- Label tryBlockStart = new Label();
- Label tryBlockEnd = new Label();
- Label catchBlockStart = new Label();
- Label catchBlockEnd = new Label();
-
- String throwableName =
- new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
- mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
-
- // The try block attempts to perform the expected operations, then jumps to success
- mv.visitLabel(tryBlockStart);
- Size trySize = tryBody.apply(mv, implementationContext);
- mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
- mv.visitLabel(tryBlockEnd);
-
- // The handler wraps the exception, and then throws.
- mv.visitLabel(catchBlockStart);
- // Add the exception to the frame
- mv.visitFrame(Opcodes.F_SAME1,
- // No local variables
- 0, new Object[] {},
- // 1 stack element (the throwable)
- 1, new Object[] { throwableName });
-
- Size catchSize = new StackManipulation.Compound(
- MethodInvocation.invoke(createUserCodeException),
- Throw.INSTANCE)
- .apply(mv, implementationContext);
-
- mv.visitLabel(catchBlockEnd);
- // The frame contents after the try/catch block is the same
- // as it was before.
- mv.visitFrame(Opcodes.F_SAME,
- // No local variables
- 0, new Object[] {},
- // No new stack variables
- 0, new Object[] {});
-
- return new Size(
- trySize.getSizeImpact(),
- Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize()));
- }
- };
- }
-
- @Override
- public ByteCodeAppender appender(final Target implementationTarget) {
- return new ByteCodeAppender() {
- @Override
- public Size apply(
- MethodVisitor methodVisitor,
- Context implementationContext,
- MethodDescription instrumentedMethod) {
- StackManipulation.Size size = new StackManipulation.Compound(
- // Put the target on top of the stack
- pushDelegateField(),
- // Do any necessary pre-delegation work
- before.manipulation(field.getType().asErasure(), instrumentedMethod, target == null),
- // Invoke the target method, if there is one. If there wasn't, then isStartBundle was
- // true, and we've already emitted the appropriate return instructions.
- target != null
- ? invokeTargetMethod(instrumentedMethod)
- : StackManipulation.Trivial.INSTANCE)
- .apply(methodVisitor, implementationContext);
- return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
- }
- };
- }
- }
-
- /**
- * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
- * for a constructor that takes a single argument and assigns it to the delegate field.
- * {@link AdditionalParameter} to the given {@link DoFn} method.
- */
- private static final class InvokerConstructor implements Implementation {
- @Override
- public InstrumentedType prepare(InstrumentedType instrumentedType) {
- return instrumentedType;
- }
-
- @Override
- public ByteCodeAppender appender(final Target implementationTarget) {
- return new ByteCodeAppender() {
- @Override
- public Size apply(
- MethodVisitor methodVisitor,
- Context implementationContext,
- MethodDescription instrumentedMethod) {
- StackManipulation.Size size = new StackManipulation.Compound(
- // Load the this reference
- MethodVariableAccess.REFERENCE.loadOffset(0),
- // Invoke the super constructor (default constructor of Object)
- MethodInvocation
- .invoke(new TypeDescription.ForLoadedType(Object.class)
- .getDeclaredMethods()
- .filter(ElementMatchers.isConstructor()
- .and(ElementMatchers.takesArguments(0)))
- .getOnly()),
- // Load the this reference
- MethodVariableAccess.REFERENCE.loadOffset(0),
- // Load the delegate argument
- MethodVariableAccess.REFERENCE.loadOffset(1),
- // Assign the delegate argument to the delegate field
- FieldAccess.forField(implementationTarget.getInstrumentedType()
- .getDeclaredFields()
- .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
- .getOnly()).putter(),
- // Return void.
- MethodReturn.VOID
- ).apply(methodVisitor, implementationContext);
- return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index dd1baab..4cd410a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -92,7 +92,7 @@ public class DoFnTester<InputT, OutputT> {
@SuppressWarnings("unchecked")
public static <InputT, OutputT> DoFnTester<InputT, OutputT>
of(DoFn<InputT, OutputT> fn) {
- return new DoFnTester<InputT, OutputT>(DoFnReflector.of(fn.getClass()).toDoFn(fn));
+ return new DoFnTester<InputT, OutputT>(DoFnAdapters.toOldDoFn(fn));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index aa57531..af500ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -536,7 +536,7 @@ public class ParDo {
private static <InputT, OutputT> OldDoFn<InputT, OutputT>
adapt(DoFn<InputT, OutputT> fn) {
- return DoFnReflector.of(fn.getClass()).toDoFn(fn);
+ return DoFnAdapters.toOldDoFn(fn);
}
/**
@@ -747,7 +747,7 @@ public class ParDo {
@Override
protected String getKindString() {
- Class<?> clazz = DoFnReflector.getDoFnClass(fn);
+ Class<?> clazz = DoFnAdapters.getDoFnClass(fn);
if (clazz.isAnonymousClass()) {
return "AnonymousParDo";
} else {
@@ -968,7 +968,7 @@ public class ParDo {
@Override
protected String getKindString() {
- Class<?> clazz = DoFnReflector.getDoFnClass(fn);
+ Class<?> clazz = DoFnAdapters.getDoFnClass(fn);
if (clazz.isAnonymousClass()) {
return "AnonymousParMultiDo";
} else {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
new file mode 100644
index 0000000..5818a59
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Interface for invoking the {@code DoFn} processing methods.
+ *
+ * Instantiating a {@link DoFnInvoker} associates it with a specific {@link DoFn} instance,
+ * referred to as the bound {@link DoFn}.
+ */
+public interface DoFnInvoker<InputT, OutputT> {
+ /**
+ * Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}.
+ */
+ void invokeSetup();
+
+ /**
+ * Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}.
+ *
+ * @param c The {@link DoFn.Context} to invoke the fn with.
+ */
+ void invokeStartBundle(DoFn<InputT, OutputT>.Context c);
+
+ /**
+ * Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}.
+ *
+ * @param c The {@link DoFn.Context} to invoke the fn with.
+ */
+ void invokeFinishBundle(DoFn<InputT, OutputT>.Context c);
+
+ /**
+ * Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}.
+ */
+ void invokeTeardown();
+
+ /**
+ * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
+ *
+ * @param c The {@link DoFn.ProcessContext} to invoke the fn with.
+ * @param extra Factory for producing extra parameter objects (such as window), if necessary.
+ */
+ void invokeProcessElement(
+ DoFn<InputT, OutputT>.ProcessContext c, DoFn.ExtraContextFactory<InputT, OutputT> extra);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
new file mode 100644
index 0000000..73874d7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.Setup;
+import org.apache.beam.sdk.transforms.DoFn.StartBundle;
+import org.apache.beam.sdk.transforms.DoFn.Teardown;
+import org.apache.beam.sdk.util.UserCodeException;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.NamingStrategy;
+import net.bytebuddy.description.field.FieldDescription;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.modifier.FieldManifestation;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.MethodCall;
+import net.bytebuddy.implementation.bind.MethodDelegationBinder;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.Throw;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.jar.asm.Label;
+import net.bytebuddy.jar.asm.MethodVisitor;
+import net.bytebuddy.jar.asm.Opcodes;
+import net.bytebuddy.matcher.ElementMatchers;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
+public class DoFnInvokers {
+ public static final DoFnInvokers INSTANCE = new DoFnInvokers();
+
+ private static final String FN_DELEGATE_FIELD_NAME = "delegate";
+
+ /**
+ * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
+ * Needed because generating an invoker class is expensive, and to avoid generating an excessive
+ * number of classes consuming PermGen memory.
+ */
+ private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
+ new LinkedHashMap<>();
+
+ private DoFnInvokers() {}
+
+ /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
+ DoFn<InputT, OutputT> fn) {
+ return newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass()), fn);
+ }
+
+ /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
+ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
+ DoFnSignature signature, DoFn<InputT, OutputT> fn) {
+ checkArgument(
+ signature.fnClass().equals(fn.getClass()),
+ "Signature is for class %s, but fn is of class %s",
+ signature.fnClass(),
+ fn.getClass());
+ try {
+ @SuppressWarnings("unchecked")
+ DoFnInvoker<InputT, OutputT> invoker =
+ (DoFnInvoker<InputT, OutputT>)
+ getOrGenerateByteBuddyInvokerConstructor(signature).newInstance(fn);
+ return invoker;
+ } catch (InstantiationException
+ | IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | SecurityException e) {
+ throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e);
+ }
+ }
+
+ /**
+ * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class and
+ * caches it.
+ */
+ private synchronized Constructor<?> getOrGenerateByteBuddyInvokerConstructor(
+ DoFnSignature signature) {
+ Class<? extends DoFn> fnClass = signature.fnClass();
+ Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
+ if (constructor == null) {
+ Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
+ try {
+ constructor = invokerClass.getConstructor(fnClass);
+ } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException(e);
+ }
+ byteBuddyInvokerConstructorCache.put(fnClass, constructor);
+ }
+ return constructor;
+ }
+
+ /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
+ private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
+ Class<? extends DoFn> fnClass = signature.fnClass();
+
+ final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);
+
+ DynamicType.Builder<?> builder =
+ new ByteBuddy()
+ // Create subclasses inside the target class, to have access to
+ // private and package-private bits
+ .with(
+ new NamingStrategy.SuffixingRandom("auxiliary") {
+ @Override
+ public String subclass(TypeDescription.Generic superClass) {
+ return super.name(clazzDescription);
+ }
+ })
+ // Create a subclass of DoFnInvoker
+ .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+ .defineField(
+ FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+ .defineConstructor(Visibility.PUBLIC)
+ .withParameter(fnClass)
+ .intercept(new InvokerConstructor())
+ .method(ElementMatchers.named("invokeProcessElement"))
+ .intercept(new ProcessElementDelegation(signature.processElement()))
+ .method(ElementMatchers.named("invokeStartBundle"))
+ .intercept(
+ signature.startBundle() == null
+ ? new NoopMethodImplementation()
+ : new BundleMethodDelegation(signature.startBundle()))
+ .method(ElementMatchers.named("invokeFinishBundle"))
+ .intercept(
+ signature.finishBundle() == null
+ ? new NoopMethodImplementation()
+ : new BundleMethodDelegation(signature.finishBundle()))
+ .method(ElementMatchers.named("invokeSetup"))
+ .intercept(
+ signature.setup() == null
+ ? new NoopMethodImplementation()
+ : new LifecycleMethodDelegation(signature.setup()))
+ .method(ElementMatchers.named("invokeTeardown"))
+ .intercept(
+ signature.teardown() == null
+ ? new NoopMethodImplementation()
+ : new LifecycleMethodDelegation(signature.teardown()));
+
+ DynamicType.Unloaded<?> unloaded = builder.make();
+
+ @SuppressWarnings("unchecked")
+ Class<? extends DoFnInvoker<?, ?>> res =
+ (Class<? extends DoFnInvoker<?, ?>>)
+ unloaded
+ .load(DoFnInvokers.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded();
+ return res;
+ }
+
+ /** Implements an invoker method by doing nothing and immediately returning void. */
+ private static class NoopMethodImplementation implements Implementation {
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation manipulation = MethodReturn.VOID;
+ StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+ }
+
+ /**
+ * Base class for implementing an invoker method by delegating to a method of the target {@link
+ * DoFn}.
+ */
+ private abstract static class MethodDelegation implements Implementation {
+ FieldDescription delegateField;
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ // Remember the field description of the instrumented type.
+ delegateField =
+ instrumentedType
+ .getDeclaredFields()
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly();
+
+ // Delegating the method call doesn't require any changes to the instrumented type.
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation manipulation =
+ new StackManipulation.Compound(
+ // Push "this" reference to the stack
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Access the delegate field of the the invoker
+ FieldAccess.forField(delegateField).getter(),
+ invokeTargetMethod(instrumentedMethod));
+ StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+
+ /**
+ * Generates code to invoke the target method. When this is called the delegate field will be on
+ * top of the stack. This should add any necessary arguments to the stack and then perform the
+ * method invocation.
+ */
+ protected abstract StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod);
+ }
+
+ /**
+ * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
+ * {@link DoFn.ProcessElement} method.
+ */
+ private static final class ProcessElementDelegation extends MethodDelegation {
+ private static final Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription>
+ EXTRA_CONTEXT_FACTORY_METHODS;
+
+ static {
+ try {
+ Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription> methods =
+ new EnumMap<>(DoFnSignature.ProcessElementMethod.Parameter.class);
+ methods.put(
+ DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW,
+ new MethodDescription.ForLoadedMethod(
+ DoFn.ExtraContextFactory.class.getMethod("window")));
+ methods.put(
+ DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER,
+ new MethodDescription.ForLoadedMethod(
+ DoFn.ExtraContextFactory.class.getMethod("inputProvider")));
+ methods.put(
+ DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER,
+ new MethodDescription.ForLoadedMethod(
+ DoFn.ExtraContextFactory.class.getMethod("outputReceiver")));
+ EXTRA_CONTEXT_FACTORY_METHODS = Collections.unmodifiableMap(methods);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to locate an ExtraContextFactory method that was expected to exist", e);
+ }
+ }
+
+ private final DoFnSignature.ProcessElementMethod signature;
+
+ /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
+ private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) {
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+ MethodDescription targetMethod =
+ new MethodCall.MethodLocator.ForExplicitMethod(
+ new MethodDescription.ForLoadedMethod(signature.targetMethod()))
+ .resolve(instrumentedMethod);
+
+ // Parameters of the wrapper invoker method:
+ // DoFn.ProcessContext, ExtraContextFactory.
+ // Parameters of the wrapped DoFn method:
+ // DoFn.ProcessContext, [BoundedWindow, InputProvider, OutputReceiver] in any order
+ ArrayList<StackManipulation> parameters = new ArrayList<>();
+ // Push the ProcessContext argument.
+ parameters.add(MethodVariableAccess.REFERENCE.loadOffset(1));
+ // Push the extra arguments in their actual order.
+ StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(2);
+ for (DoFnSignature.ProcessElementMethod.Parameter param : signature.extraParameters()) {
+ parameters.add(
+ new StackManipulation.Compound(
+ pushExtraContextFactory,
+ MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param))));
+ }
+
+ return new StackManipulation.Compound(
+ // Push the parameters
+ new StackManipulation.Compound(parameters),
+ // Invoke the target method
+ wrapWithUserCodeException(
+ MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+ // Return from the instrumented method
+ MethodReturn.VOID);
+ }
+ }
+
+ /**
+ * Implements {@link DoFnInvoker#invokeStartBundle} or {@link DoFnInvoker#invokeFinishBundle} by
+ * delegating respectively to the {@link StartBundle} and {@link FinishBundle} methods.
+ */
+ private static final class BundleMethodDelegation extends MethodDelegation {
+ private final DoFnSignature.BundleMethod signature;
+
+ private BundleMethodDelegation(@Nullable DoFnSignature.BundleMethod signature) {
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+ MethodDescription targetMethod =
+ new MethodCall.MethodLocator.ForExplicitMethod(
+ new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
+ .resolve(instrumentedMethod);
+ return new StackManipulation.Compound(
+ // Push the parameters
+ MethodVariableAccess.REFERENCE.loadOffset(1),
+ // Invoke the target method
+ wrapWithUserCodeException(
+ MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+ MethodReturn.VOID);
+ }
+ }
+
+ /**
+ * Implements {@link DoFnInvoker#invokeSetup} or {@link DoFnInvoker#invokeTeardown} by delegating
+ * respectively to the {@link Setup} and {@link Teardown} methods.
+ */
+ private static final class LifecycleMethodDelegation extends MethodDelegation {
+ private final DoFnSignature.LifecycleMethod signature;
+
+ private LifecycleMethodDelegation(@Nullable DoFnSignature.LifecycleMethod signature) {
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) {
+ MethodDescription targetMethod =
+ new MethodCall.MethodLocator.ForExplicitMethod(
+ new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod()))
+ .resolve(instrumentedMethod);
+ return new StackManipulation.Compound(
+ wrapWithUserCodeException(
+ MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)),
+ MethodReturn.VOID);
+ }
+ }
+
+ /**
+ * Wraps a given stack manipulation in a try catch block. Any exceptions thrown within the try are
+ * wrapped with a {@link UserCodeException}.
+ */
+ private static StackManipulation wrapWithUserCodeException(final StackManipulation tryBody) {
+ final MethodDescription createUserCodeException;
+ try {
+ createUserCodeException =
+ new MethodDescription.ForLoadedMethod(
+ UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException("Unable to find UserCodeException.wrap", e);
+ }
+
+ return new StackManipulation() {
+ @Override
+ public boolean isValid() {
+ return tryBody.isValid();
+ }
+
+ @Override
+ public Size apply(MethodVisitor mv, Implementation.Context implementationContext) {
+ Label tryBlockStart = new Label();
+ Label tryBlockEnd = new Label();
+ Label catchBlockStart = new Label();
+ Label catchBlockEnd = new Label();
+
+ String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
+ mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);
+
+ // The try block attempts to perform the expected operations, then jumps to success
+ mv.visitLabel(tryBlockStart);
+ Size trySize = tryBody.apply(mv, implementationContext);
+ mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
+ mv.visitLabel(tryBlockEnd);
+
+ // The handler wraps the exception, and then throws.
+ mv.visitLabel(catchBlockStart);
+ // Add the exception to the frame
+ mv.visitFrame(
+ Opcodes.F_SAME1,
+ // No local variables
+ 0,
+ new Object[] {},
+ // 1 stack element (the throwable)
+ 1,
+ new Object[] {throwableName});
+
+ Size catchSize =
+ new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
+ .apply(mv, implementationContext);
+
+ mv.visitLabel(catchBlockEnd);
+ // The frame contents after the try/catch block is the same
+ // as it was before.
+ mv.visitFrame(
+ Opcodes.F_SAME,
+ // No local variables
+ 0,
+ new Object[] {},
+ // No new stack variables
+ 0,
+ new Object[] {});
+
+ return new Size(
+ trySize.getSizeImpact(),
+ Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize()));
+ }
+ };
+ }
+
+ /**
+ * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
+ * for a constructor that takes a single argument and assigns it to the delegate field.
+ */
+ private static final class InvokerConstructor implements Implementation {
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return new ByteCodeAppender() {
+ @Override
+ public Size apply(
+ MethodVisitor methodVisitor,
+ Context implementationContext,
+ MethodDescription instrumentedMethod) {
+ StackManipulation.Size size =
+ new StackManipulation.Compound(
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Invoke the super constructor (default constructor of Object)
+ MethodInvocation.invoke(
+ new TypeDescription.ForLoadedType(Object.class)
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.isConstructor()
+ .and(ElementMatchers.takesArguments(0)))
+ .getOnly()),
+ // Load the this reference
+ MethodVariableAccess.REFERENCE.loadOffset(0),
+ // Load the delegate argument
+ MethodVariableAccess.REFERENCE.loadOffset(1),
+ // Assign the delegate argument to the delegate field
+ FieldAccess.forField(
+ implementationTarget
+ .getInstrumentedType()
+ .getDeclaredFields()
+ .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+ .getOnly())
+ .putter(),
+ // Return void.
+ MethodReturn.VOID)
+ .apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
+ }
+ };
+ }
+ }
+}