You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/17 23:05:26 UTC
[2/4] incubator-beam git commit: Rewrites DoFnReflector to go via
DoFnSignature
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
new file mode 100644
index 0000000..6730140
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+import com.google.auto.value.AutoValue;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra
+ * context it requires, types of the input and output elements, etc.
+ *
+ * <p>See <a href="https://s.apache.org/a-new-dofn">A new DoFn</a>.
+ */
+@AutoValue
+public abstract class DoFnSignature {
+ public abstract Class<? extends DoFn> fnClass();
+
+ public abstract ProcessElementMethod processElement();
+
+ @Nullable
+ public abstract BundleMethod startBundle();
+
+ @Nullable
+ public abstract BundleMethod finishBundle();
+
+ @Nullable
+ public abstract LifecycleMethod setup();
+
+ @Nullable
+ public abstract LifecycleMethod teardown();
+
+ static DoFnSignature create(
+ Class<? extends DoFn> fnClass,
+ ProcessElementMethod processElement,
+ @Nullable BundleMethod startBundle,
+ @Nullable BundleMethod finishBundle,
+ @Nullable LifecycleMethod setup,
+ @Nullable LifecycleMethod teardown) {
+ return new AutoValue_DoFnSignature(
+ fnClass,
+ processElement,
+ startBundle,
+ finishBundle,
+ setup,
+ teardown);
+ }
+
+ /** Describes a {@link DoFn.ProcessElement} method. */
+ @AutoValue
+ public abstract static class ProcessElementMethod {
+ enum Parameter {
+ BOUNDED_WINDOW,
+ INPUT_PROVIDER,
+ OUTPUT_RECEIVER
+ }
+
+ public abstract Method targetMethod();
+
+ public abstract List<Parameter> extraParameters();
+
+ static ProcessElementMethod create(Method targetMethod, List<Parameter> extraParameters) {
+ return new AutoValue_DoFnSignature_ProcessElementMethod(
+ targetMethod, Collections.unmodifiableList(extraParameters));
+ }
+
+ /** @return true if the reflected {@link DoFn} uses a Single Window. */
+ public boolean usesSingleWindow() {
+ return extraParameters().contains(Parameter.BOUNDED_WINDOW);
+ }
+ }
+
+ /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
+ @AutoValue
+ public abstract static class BundleMethod {
+ public abstract Method targetMethod();
+
+ static BundleMethod create(Method targetMethod) {
+ return new AutoValue_DoFnSignature_BundleMethod(targetMethod);
+ }
+ }
+
+ /** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */
+ @AutoValue
+ public abstract static class LifecycleMethod {
+ public abstract Method targetMethod();
+
+ static LifecycleMethod create(Method targetMethod) {
+ return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
new file mode 100644
index 0000000..80b3b4f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.reflect.TypeParameter;
+import com.google.common.reflect.TypeToken;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getOrParseSignature}.
+ */
+public class DoFnSignatures {
+ public static final DoFnSignatures INSTANCE = new DoFnSignatures();
+
+ private DoFnSignatures() {}
+
+ private final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
+
+ /** @return the {@link DoFnSignature} for the given {@link DoFn}. */
+ public synchronized DoFnSignature getOrParseSignature(
+ @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
+ DoFnSignature signature = signatureCache.get(fn);
+ if (signature == null) {
+ signatureCache.put(fn, signature = parseSignature(fn));
+ }
+ return signature;
+ }
+
+ /** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */
+ private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) {
+ TypeToken<?> inputT = null;
+ TypeToken<?> outputT = null;
+
+ // Extract the input and output type.
+ checkArgument(
+ DoFn.class.isAssignableFrom(fnClass),
+ "%s must be subtype of DoFn",
+ fnClass.getSimpleName());
+ TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass);
+ for (TypeToken<?> supertype : fnToken.getTypes()) {
+ if (!supertype.getRawType().equals(DoFn.class)) {
+ continue;
+ }
+ Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments();
+ inputT = TypeToken.of(args[0]);
+ outputT = TypeToken.of(args[1]);
+ }
+ checkNotNull(inputT, "Unable to determine input type from %s", fnClass);
+
+ Method processElementMethod = findAnnotatedMethod(DoFn.ProcessElement.class, fnClass, true);
+ Method startBundleMethod = findAnnotatedMethod(DoFn.StartBundle.class, fnClass, false);
+ Method finishBundleMethod = findAnnotatedMethod(DoFn.FinishBundle.class, fnClass, false);
+ Method setupMethod = findAnnotatedMethod(DoFn.Setup.class, fnClass, false);
+ Method teardownMethod = findAnnotatedMethod(DoFn.Teardown.class, fnClass, false);
+
+ return DoFnSignature.create(
+ fnClass,
+ analyzeProcessElementMethod(fnToken, processElementMethod, inputT, outputT),
+ (startBundleMethod == null)
+ ? null
+ : analyzeBundleMethod(fnToken, startBundleMethod, inputT, outputT),
+ (finishBundleMethod == null)
+ ? null
+ : analyzeBundleMethod(fnToken, finishBundleMethod, inputT, outputT),
+ (setupMethod == null) ? null : analyzeLifecycleMethod(setupMethod),
+ (teardownMethod == null) ? null : analyzeLifecycleMethod(teardownMethod));
+ }
+
+ /**
+ * Generates a type token for {@code DoFn<InputT, OutputT>.ProcessContext} given {@code InputT}
+ * and {@code OutputT}.
+ */
+ private static <InputT, OutputT>
+ TypeToken<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf(
+ TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
+ return new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}.where(
+ new TypeParameter<InputT>() {}, inputT)
+ .where(new TypeParameter<OutputT>() {}, outputT);
+ }
+
+ /**
+ * Generates a type token for {@code DoFn<InputT, OutputT>.Context} given {@code InputT} and
+ * {@code OutputT}.
+ */
+ private static <InputT, OutputT> TypeToken<DoFn<InputT, OutputT>.Context> doFnContextTypeOf(
+ TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
+ return new TypeToken<DoFn<InputT, OutputT>.Context>() {}.where(
+ new TypeParameter<InputT>() {}, inputT)
+ .where(new TypeParameter<OutputT>() {}, outputT);
+ }
+
+ /** Generates a type token for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */
+ private static <InputT> TypeToken<DoFn.InputProvider<InputT>> inputProviderTypeOf(
+ TypeToken<InputT> inputT) {
+ return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
+ new TypeParameter<InputT>() {}, inputT);
+ }
+
+ /** Generates a type token for {@code DoFn.OutputReceiver<OutputT>} given {@code OutputT}. */
+ private static <OutputT> TypeToken<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf(
+ TypeToken<OutputT> inputT) {
+ return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
+ new TypeParameter<OutputT>() {}, inputT);
+ }
+
+ @VisibleForTesting
+ static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
+ TypeToken<? extends DoFn> fnClass, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
+ checkArgument(
+ void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+ checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+
+ TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
+
+ Type[] params = m.getGenericParameterTypes();
+ TypeToken<?> contextToken = null;
+ if (params.length > 0) {
+ contextToken = fnClass.resolveType(params[0]);
+ }
+ checkArgument(
+ contextToken != null && contextToken.equals(processContextToken),
+ "%s must take a %s as its first argument",
+ format(m),
+ formatType(processContextToken));
+
+ List<DoFnSignature.ProcessElementMethod.Parameter> extraParameters = new ArrayList<>();
+ TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
+ TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
+ for (int i = 1; i < params.length; ++i) {
+ TypeToken<?> param = fnClass.resolveType(params[i]);
+ Class<?> rawType = param.getRawType();
+ if (rawType.equals(BoundedWindow.class)) {
+ checkArgument(
+ !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW),
+ "Multiple BoundedWindow parameters in %s",
+ format(m));
+ extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW);
+ } else if (rawType.equals(DoFn.InputProvider.class)) {
+ checkArgument(
+ !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER),
+ "Multiple InputProvider parameters in %s",
+ format(m));
+ checkArgument(
+ param.equals(expectedInputProviderT),
+ "Wrong type of InputProvider parameter for method %s: %s, should be %s",
+ format(m),
+ formatType(param),
+ formatType(expectedInputProviderT));
+ extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER);
+ } else if (rawType.equals(DoFn.OutputReceiver.class)) {
+ checkArgument(
+ !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER),
+ "Multiple OutputReceiver parameters in %s",
+ format(m));
+ checkArgument(
+ param.equals(expectedOutputReceiverT),
+ "Wrong type of OutputReceiver parameter for method %s: %s, should be %s",
+ format(m),
+ formatType(param),
+ formatType(expectedOutputReceiverT));
+ extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER);
+ } else {
+ List<String> allowedParamTypes =
+ Arrays.asList(formatType(new TypeToken<BoundedWindow>() {}));
+ checkArgument(
+ false,
+ "%s is not a valid context parameter for method %s. Should be one of %s",
+ formatType(param),
+ format(m),
+ allowedParamTypes);
+ }
+ }
+
+ return DoFnSignature.ProcessElementMethod.create(m, extraParameters);
+ }
+
+ @VisibleForTesting
+ static DoFnSignature.BundleMethod analyzeBundleMethod(
+ TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT, TypeToken<?> outputT) {
+ checkArgument(
+ void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+ checkArgument(!m.isVarArgs(), "%s must not have var args", format(m));
+
+ TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT);
+
+ Type[] params = m.getGenericParameterTypes();
+ checkArgument(
+ params.length == 1,
+ "%s must have a single argument of type %s",
+ format(m),
+ formatType(expectedContextToken));
+ TypeToken<?> contextToken = fnToken.resolveType(params[0]);
+ checkArgument(
+ contextToken.equals(expectedContextToken),
+ "Wrong type of context argument to %s: %s, must be %s",
+ format(m),
+ formatType(contextToken),
+ formatType(expectedContextToken));
+
+ return DoFnSignature.BundleMethod.create(m);
+ }
+
+ private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(Method m) {
+ checkArgument(
+ void.class.equals(m.getReturnType()), "%s must have a void return type", format(m));
+ checkArgument(
+ m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m));
+ return DoFnSignature.LifecycleMethod.create(m);
+ }
+
+ private static Collection<Method> declaredMethodsWithAnnotation(
+ Class<? extends Annotation> anno, Class<?> startClass, Class<?> stopClass) {
+ Collection<Method> matches = new ArrayList<>();
+
+ Class<?> clazz = startClass;
+ LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
+
+ // First, find all declared methods on the startClass and parents (up to stopClass)
+ while (clazz != null && !clazz.equals(stopClass)) {
+ for (Method method : clazz.getDeclaredMethods()) {
+ if (method.isAnnotationPresent(anno)) {
+ matches.add(method);
+ }
+ }
+
+ Collections.addAll(interfaces, clazz.getInterfaces());
+
+ clazz = clazz.getSuperclass();
+ }
+
+ // Now, iterate over all the discovered interfaces
+ for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
+ if (method.isAnnotationPresent(anno)) {
+ matches.add(method);
+ }
+ }
+ return matches;
+ }
+
+ private static Method findAnnotatedMethod(
+ Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
+ Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class);
+
+ if (matches.size() == 0) {
+ checkArgument(
+ !required,
+ "No method annotated with @%s found in %s",
+ anno.getSimpleName(),
+ fnClazz.getName());
+ return null;
+ }
+
+ // If we have at least one match, then either it should be the only match
+ // or it should be an extension of the other matches (which came from parent
+ // classes).
+ Method first = matches.iterator().next();
+ for (Method other : matches) {
+ checkArgument(
+ first.getName().equals(other.getName())
+ && Arrays.equals(first.getParameterTypes(), other.getParameterTypes()),
+ "Found multiple methods annotated with @%s. [%s] and [%s]",
+ anno.getSimpleName(),
+ format(first),
+ format(other));
+ }
+
+ // We need to be able to call it. We require it is public.
+ checkArgument(
+ (first.getModifiers() & Modifier.PUBLIC) != 0, "%s must be public", format(first));
+
+ // And make sure its not static.
+ checkArgument(
+ (first.getModifiers() & Modifier.STATIC) == 0, "%s must not be static", format(first));
+
+ return first;
+ }
+
+ private static String format(Method m) {
+ return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
+ }
+
+ private static String formatType(TypeToken<?> t) {
+ return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
new file mode 100644
index 0000000..4df5209
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines reflection-based utilities for analyzing {@link org.apache.beam.sdk.transforms.DoFn}'s
+ * and creating {@link org.apache.beam.sdk.transforms.reflect.DoFnSignature}'s and
+ * {@link org.apache.beam.sdk.transforms.reflect.DoFnInvoker}'s from them.
+ */
+package org.apache.beam.sdk.transforms.reflect;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
deleted file mode 100644
index e05e5e2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ /dev/null
@@ -1,822 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFn.Setup;
-import org.apache.beam.sdk.transforms.DoFn.Teardown;
-import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.UserCodeException;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.lang.reflect.Method;
-
-/**
- * Tests for {@link DoFnReflector}.
- */
-@RunWith(JUnit4.class)
-public class DoFnReflectorTest {
-
- /**
- * A convenience struct holding flags that indicate whether a particular method was invoked.
- */
- public static class Invocations {
- public boolean wasProcessElementInvoked = false;
- public boolean wasStartBundleInvoked = false;
- public boolean wasFinishBundleInvoked = false;
- public boolean wasSetupInvoked = false;
- public boolean wasTeardownInvoked = false;
- private final String name;
-
- public Invocations(String name) {
- this.name = name;
- }
- }
-
- private DoFn<String, String> fn;
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Mock
- private DoFn<String, String>.ProcessContext mockContext;
- @Mock
- private BoundedWindow mockWindow;
- @Mock
- private DoFn.InputProvider<String> mockInputProvider;
- @Mock
- private DoFn.OutputReceiver<String> mockOutputReceiver;
-
- private ExtraContextFactory<String, String> extraContextFactory;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- this.extraContextFactory = new ExtraContextFactory<String, String>() {
- @Override
- public BoundedWindow window() {
- return mockWindow;
- }
-
- @Override
- public DoFn.InputProvider<String> inputProvider() {
- return mockInputProvider;
- }
-
- @Override
- public DoFn.OutputReceiver<String> outputReceiver() {
- return mockOutputReceiver;
- }
- };
- }
-
- private DoFnReflector underTest(DoFn<String, String> fn) {
- this.fn = fn;
- return DoFnReflector.of(fn.getClass());
- }
-
- private void checkInvokeProcessElementWorks(
- DoFnReflector r, Invocations... invocations) throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called processElement on " + invocation.name,
- invocation.wasProcessElementInvoked);
- }
- r.bindInvoker(fn).invokeProcessElement(mockContext, extraContextFactory);
- for (Invocations invocation : invocations) {
- assertTrue("Should have called processElement on " + invocation.name,
- invocation.wasProcessElementInvoked);
- }
- }
-
- private void checkInvokeStartBundleWorks(
- DoFnReflector r, Invocations... invocations) throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called startBundle on " + invocation.name,
- invocation.wasStartBundleInvoked);
- }
- r.bindInvoker(fn).invokeStartBundle(mockContext, extraContextFactory);
- for (Invocations invocation : invocations) {
- assertTrue("Should have called startBundle on " + invocation.name,
- invocation.wasStartBundleInvoked);
- }
- }
-
- private void checkInvokeFinishBundleWorks(
- DoFnReflector r, Invocations... invocations) throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called finishBundle on " + invocation.name,
- invocation.wasFinishBundleInvoked);
- }
- r.bindInvoker(fn).invokeFinishBundle(mockContext, extraContextFactory);
- for (Invocations invocation : invocations) {
- assertTrue("Should have called finishBundle on " + invocation.name,
- invocation.wasFinishBundleInvoked);
- }
- }
-
- private void checkInvokeSetupWorks(DoFnReflector r, Invocations... invocations) throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called setup on " + invocation.name,
- invocation.wasSetupInvoked);
- }
- r.bindInvoker(fn).invokeSetup();
- for (Invocations invocation : invocations) {
- assertTrue("Should have called setup on " + invocation.name,
- invocation.wasSetupInvoked);
- }
- }
-
- private void checkInvokeTeardownWorks(DoFnReflector r, Invocations... invocations)
- throws Exception {
- assertTrue("Need at least one invocation to check", invocations.length >= 1);
- for (Invocations invocation : invocations) {
- assertFalse("Should not yet have called teardown on " + invocation.name,
- invocation.wasTeardownInvoked);
- }
- r.bindInvoker(fn).invokeTeardown();
- for (Invocations invocation : invocations) {
- assertTrue("Should have called teardown on " + invocation.name,
- invocation.wasTeardownInvoked);
- }
- }
-
- @Test
- public void testDoFnWithNoExtraContext() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(ProcessContext c)
- throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
- });
-
- assertFalse(reflector.usesSingleWindow());
-
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnInvokersReused() throws Exception {
- // Ensures that we don't create a new Invoker class for every instance of the OldDoFn.
- IdentityParent fn1 = new IdentityParent();
- IdentityParent fn2 = new IdentityParent();
- DoFnReflector reflector1 = underTest(fn1);
- DoFnReflector reflector2 = underTest(fn2);
- assertSame("DoFnReflector instances should be cached and reused for identical types",
- reflector1, reflector2);
- assertSame("Invoker classes should only be generated once for each type",
- reflector1.bindInvoker(fn1).getClass(),
- reflector2.bindInvoker(fn2).getClass());
- }
-
- interface InterfaceWithProcessElement {
- @ProcessElement
- void processElement(DoFn<String, String>.ProcessContext c);
- }
-
- interface LayersOfInterfaces extends InterfaceWithProcessElement {}
-
- private class IdentityUsingInterfaceWithProcessElement
- extends DoFn<String, String>
- implements LayersOfInterfaces {
-
- private Invocations invocations = new Invocations("Named Class");
-
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
- }
-
- @Test
- public void testDoFnWithProcessElementInterface() throws Exception {
- IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
- DoFnReflector reflector = underTest(fn);
- assertFalse(reflector.usesSingleWindow());
- checkInvokeProcessElementWorks(reflector, fn.invocations);
- }
-
- private class IdentityParent extends DoFn<String, String> {
- protected Invocations parentInvocations = new Invocations("IdentityParent");
-
- @ProcessElement
- public void process(ProcessContext c) {
- parentInvocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- }
- }
-
- private class IdentityChildWithoutOverride extends IdentityParent {
- }
-
- private class IdentityChildWithOverride extends IdentityParent {
- protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
-
- @Override
- public void process(DoFn<String, String>.ProcessContext c) {
- super.process(c);
- childInvocations.wasProcessElementInvoked = true;
- }
- }
-
- @Test
- public void testDoFnWithMethodInSuperclass() throws Exception {
- IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
- DoFnReflector reflector = underTest(fn);
- assertFalse(reflector.usesSingleWindow());
- checkInvokeProcessElementWorks(reflector, fn.parentInvocations);
- }
-
- @Test
- public void testDoFnWithMethodInSubclass() throws Exception {
- IdentityChildWithOverride fn = new IdentityChildWithOverride();
- DoFnReflector reflector = underTest(fn);
- assertFalse(reflector.usesSingleWindow());
- checkInvokeProcessElementWorks(reflector, fn.parentInvocations, fn.childInvocations);
- }
-
- @Test
- public void testDoFnWithWindow() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow w)
- throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(w, mockWindow);
- }
- });
-
- assertTrue(reflector.usesSingleWindow());
-
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnWithOutputReceiver() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(ProcessContext c, DoFn.OutputReceiver<String> o)
- throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(o, mockOutputReceiver);
- }
- });
-
- assertFalse(reflector.usesSingleWindow());
-
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnWithInputProvider() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(ProcessContext c, DoFn.InputProvider<String> i)
- throws Exception {
- invocations.wasProcessElementInvoked = true;
- assertSame(c, mockContext);
- assertSame(i, mockInputProvider);
- }
- });
-
- assertFalse(reflector.usesSingleWindow());
-
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnWithStartBundle() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
- @StartBundle
- public void startBundle(Context c) {
- invocations.wasStartBundleInvoked = true;
- assertSame(c, mockContext);
- }
-
- @FinishBundle
- public void finishBundle(Context c) {
- invocations.wasFinishBundleInvoked = true;
- assertSame(c, mockContext);
- }
- });
-
- checkInvokeStartBundleWorks(reflector, invocations);
- checkInvokeFinishBundleWorks(reflector, invocations);
- }
-
- @Test
- public void testDoFnWithSetupTeardown() throws Exception {
- final Invocations invocations = new Invocations("AnonymousClass");
- DoFnReflector reflector = underTest(new DoFn<String, String>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
-
- @StartBundle
- public void startBundle(Context c) {
- invocations.wasStartBundleInvoked = true;
- assertSame(c, mockContext);
- }
-
- @FinishBundle
- public void finishBundle(Context c) {
- invocations.wasFinishBundleInvoked = true;
- assertSame(c, mockContext);
- }
-
- @Setup
- public void before() {
- invocations.wasSetupInvoked = true;
- }
-
- @Teardown
- public void after() {
- invocations.wasTeardownInvoked = true;
- }
- });
-
- checkInvokeSetupWorks(reflector, invocations);
- checkInvokeTeardownWorks(reflector, invocations);
- }
-
- @Test
- public void testNoProcessElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("No method annotated with @ProcessElement found");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {});
- }
-
- @Test
- public void testMultipleProcessElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
- thrown.expectMessage("foo()");
- thrown.expectMessage("bar()");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void foo() {}
-
- @ProcessElement
- public void bar() {}
- });
- }
-
- @Test
- public void testMultipleStartBundleElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Found multiple methods annotated with @StartBundle");
- thrown.expectMessage("bar()");
- thrown.expectMessage("baz()");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void foo() {}
-
- @StartBundle
- public void bar() {}
-
- @StartBundle
- public void baz() {}
- });
- }
-
- @Test
- public void testMultipleFinishBundleElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Found multiple methods annotated with @FinishBundle");
- thrown.expectMessage("bar()");
- thrown.expectMessage("baz()");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void foo() {}
-
- @FinishBundle
- public void bar() {}
-
- @FinishBundle
- public void baz() {}
- });
- }
-
- private static class PrivateDoFnClass extends DoFn<String, String> {
- final Invocations invocations = new Invocations(getClass().getName());
-
- @ProcessElement
- public void processThis(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- @Test
- public void testLocalPrivateDoFnClass() throws Exception {
- PrivateDoFnClass fn = new PrivateDoFnClass();
- DoFnReflector reflector = underTest(fn);
- checkInvokeProcessElementWorks(reflector, fn.invocations);
- }
-
- @Test
- public void testStaticPackagePrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
- DoFnReflector reflector =
- underTest(DoFnReflectorTestHelper.newStaticPackagePrivateDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testInnerPackagePrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
- DoFnReflector reflector =
- underTest(new DoFnReflectorTestHelper().newInnerPackagePrivateDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testStaticPrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticPrivateDoFn");
- DoFnReflector reflector = underTest(DoFnReflectorTestHelper.newStaticPrivateDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testInnerPrivateDoFnClass() throws Exception {
- Invocations invocations = new Invocations("StaticInnerDoFn");
- DoFnReflector reflector =
- underTest(new DoFnReflectorTestHelper().newInnerPrivateDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
- Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
- DoFnReflector reflector =
- underTest(new DoFnReflectorTestHelper().newInnerAnonymousDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
- Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
- DoFnReflector reflector =
- underTest(DoFnReflectorTestHelper.newStaticAnonymousDoFn(invocations));
- checkInvokeProcessElementWorks(reflector, invocations);
- }
-
- @Test
- public void testPrivateProcessElement() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("process() must be public");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- private void process() {}
- });
- }
-
- @Test
- public void testPrivateStartBundle() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("startBundle() must be public");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void processElement() {}
-
- @StartBundle
- void startBundle() {}
- });
- }
-
- @Test
- public void testPrivateFinishBundle() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("finishBundle() must be public");
- thrown.expectMessage(getClass().getName() + "$");
- underTest(new DoFn<String, String>() {
- @ProcessElement
- public void processElement() {}
-
- @FinishBundle
- void finishBundle() {}
- });
- }
-
- @SuppressWarnings({"unused"})
- private void missingProcessContext() {}
-
- @Test
- public void testMissingProcessContext() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(getClass().getName()
- + "#missingProcessContext() must take a ProcessContext as its first argument");
-
- DoFnReflector.verifyProcessMethodArguments(
- getClass().getDeclaredMethod("missingProcessContext"));
- }
-
- @SuppressWarnings({"unused"})
- private void badProcessContext(String s) {}
-
- @Test
- public void testBadProcessContextType() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(getClass().getName()
- + "#badProcessContext(String) must take a ProcessContext as its first argument");
-
- DoFnReflector.verifyProcessMethodArguments(
- getClass().getDeclaredMethod("badProcessContext", String.class));
- }
-
- @SuppressWarnings({"unused"})
- private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
-
- @Test
- public void testBadExtraContext() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "int is not a valid context parameter for method "
- + getClass().getName() + "#badExtraContext(Context, int). Should be one of [");
-
- DoFnReflector.verifyBundleMethodArguments(
- getClass().getDeclaredMethod("badExtraContext", Context.class, int.class));
- }
-
- @SuppressWarnings({"unused"})
- private void badExtraProcessContext(
- DoFn<Integer, String>.ProcessContext c, Integer n) {}
-
- @Test
- public void testBadExtraProcessContextType() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "Integer is not a valid context parameter for method "
- + getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)"
- + ". Should be one of [BoundedWindow]");
-
- DoFnReflector.verifyProcessMethodArguments(
- getClass().getDeclaredMethod("badExtraProcessContext",
- ProcessContext.class, Integer.class));
- }
-
- @SuppressWarnings("unused")
- private int badReturnType() {
- return 0;
- }
-
- @Test
- public void testBadReturnType() throws Exception {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void return type");
-
- DoFnReflector.verifyProcessMethodArguments(getClass().getDeclaredMethod("badReturnType"));
- }
-
- @SuppressWarnings("unused")
- private void goodGenerics(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<Integer> input,
- DoFn.OutputReceiver<String> output) {}
-
- @Test
- public void testValidGenerics() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "goodGenerics",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private void goodWildcards(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<?> input,
- DoFn.OutputReceiver<?> output) {}
-
- @Test
- public void testGoodWildcards() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "goodWildcards",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private void goodBoundedWildcards(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<? super Integer> input,
- DoFn.OutputReceiver<? super String> output) {}
-
- @Test
- public void testGoodBoundedWildcards() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "goodBoundedWildcards",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private <InputT, OutputT> void goodTypeVariables(
- DoFn<InputT, OutputT>.ProcessContext c,
- DoFn.InputProvider<InputT> input,
- DoFn.OutputReceiver<OutputT> output) {}
-
- @Test
- public void testGoodTypeVariables() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "goodTypeVariables",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private void badGenericTwoArgs(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<Integer> input,
- DoFn.OutputReceiver<Integer> output) {}
-
- @Test
- public void testBadGenericsTwoArgs() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "badGenericTwoArgs",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Incompatible generics in context parameter "
- + "OutputReceiver<Integer> "
- + "for method " + getClass().getName()
- + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be "
- + "OutputReceiver<String>");
-
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private void badGenericWildCards(
- DoFn<Integer, String>.ProcessContext c,
- DoFn.InputProvider<Integer> input,
- DoFn.OutputReceiver<? super Integer> output) {}
-
- @Test
- public void testBadGenericWildCards() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "badGenericWildCards",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Incompatible generics in context parameter "
- + "OutputReceiver<? super Integer> for method "
- + getClass().getName()
- + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be "
- + "OutputReceiver<String>");
-
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @SuppressWarnings("unused")
- private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c,
- DoFn.InputProvider<InputT> input, DoFn.OutputReceiver<InputT> output) {}
-
- @Test
- public void testBadTypeVariables() throws Exception {
- Method method =
- getClass()
- .getDeclaredMethod(
- "badTypeVariables",
- DoFn.ProcessContext.class,
- DoFn.InputProvider.class,
- DoFn.OutputReceiver.class);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Incompatible generics in context parameter "
- + "OutputReceiver<InputT> for method " + getClass().getName()
- + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be "
- + "OutputReceiver<OutputT>");
-
- DoFnReflector.verifyProcessMethodArguments(method);
- }
-
- @Test
- public void testProcessElementException() throws Exception {
- DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {
- throw new IllegalArgumentException("bogus");
- }
- };
-
- thrown.expect(UserCodeException.class);
- thrown.expectMessage("bogus");
- DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeProcessElement(null, null);
- }
-
- @Test
- public void testStartBundleException() throws Exception {
- DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
- @StartBundle
- public void startBundle(@SuppressWarnings("unused") Context c) {
- throw new IllegalArgumentException("bogus");
- }
-
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {
- }
- };
-
- thrown.expect(UserCodeException.class);
- thrown.expectMessage("bogus");
- DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeStartBundle(null, null);
- }
-
- @Test
- public void testFinishBundleException() throws Exception {
- DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
- @FinishBundle
- public void finishBundle(@SuppressWarnings("unused") Context c) {
- throw new IllegalArgumentException("bogus");
- }
-
- @ProcessElement
- public void processElement(@SuppressWarnings("unused") ProcessContext c) {
- }
- };
-
- thrown.expect(UserCodeException.class);
- thrown.expectMessage("bogus");
- DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeFinishBundle(null, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 604536b..3469223 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -99,7 +99,7 @@ public class FlattenTest implements Serializable {
PCollection<String> output =
makePCollectionListOfStrings(p, inputs)
.apply(Flatten.<String>pCollections())
- .apply(ParDo.of(new IdentityFn<String>(){}));
+ .apply(ParDo.of(new IdentityFn<String>()));
PAssert.that(output).containsInAnyOrder(flattenLists(inputs));
p.run();
@@ -152,7 +152,7 @@ public class FlattenTest implements Serializable {
PCollection<String> output =
PCollectionList.<String>empty(p)
.apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
- .apply(ParDo.of(new IdentityFn<String>(){}));
+ .apply(ParDo.of(new IdentityFn<String>()));
PAssert.that(output).empty();
p.run();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
deleted file mode 100644
index 90fba12..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.dofnreflector;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnReflectorTest.Invocations;
-
-/**
- * Test helper for DoFnReflectorTest, which needs to test package-private access
- * to DoFns in other packages.
- */
-public class DoFnReflectorTestHelper {
-
- private static class StaticPrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public StaticPrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- private class InnerPrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public InnerPrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- static class StaticPackagePrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public StaticPackagePrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- class InnerPackagePrivateDoFn extends DoFn<String, String> {
- final Invocations invocations;
-
- public InnerPackagePrivateDoFn(Invocations invocations) {
- this.invocations = invocations;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- }
-
- public static DoFn<String, String> newStaticPackagePrivateDoFn(
- Invocations invocations) {
- return new StaticPackagePrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
- return new InnerPackagePrivateDoFn(invocations);
- }
-
- public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
- return new StaticPrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
- return new InnerPrivateDoFn(invocations);
- }
-
- public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
- return new DoFn<String, String>() {
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- };
- }
-
- public static DoFn<String, String> newStaticAnonymousDoFn(
- final Invocations invocations) {
- return new DoFn<String, String>() {
- @ProcessElement
- public void process(ProcessContext c) {
- invocations.wasProcessElementInvoked = true;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
new file mode 100644
index 0000000..7e756e2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.UserCodeException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DoFnInvokers}. */
+public class DoFnInvokersTest {
+ /** A convenience struct holding flags that indicate whether a particular method was invoked. */
+ public static class Invocations {
+ public boolean wasProcessElementInvoked = false;
+ public boolean wasStartBundleInvoked = false;
+ public boolean wasFinishBundleInvoked = false;
+ public boolean wasSetupInvoked = false;
+ public boolean wasTeardownInvoked = false;
+ private final String name;
+
+ public Invocations(String name) {
+ this.name = name;
+ }
+ }
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Mock private DoFn.ProcessContext mockContext;
+ @Mock private BoundedWindow mockWindow;
+ @Mock private DoFn.InputProvider<String> mockInputProvider;
+ @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
+
+ private DoFn.ExtraContextFactory<String, String> extraContextFactory;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ this.extraContextFactory =
+ new DoFn.ExtraContextFactory<String, String>() {
+ @Override
+ public BoundedWindow window() {
+ return mockWindow;
+ }
+
+ @Override
+ public DoFn.InputProvider<String> inputProvider() {
+ return mockInputProvider;
+ }
+
+ @Override
+ public DoFn.OutputReceiver<String> outputReceiver() {
+ return mockOutputReceiver;
+ }
+ };
+ }
+
+ private void checkInvokeProcessElementWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called processElement on " + invocation.name,
+ invocation.wasProcessElementInvoked);
+ }
+ DoFnInvokers.INSTANCE
+ .newByteBuddyInvoker(fn)
+ .invokeProcessElement(mockContext, extraContextFactory);
+ for (Invocations invocation : invocations) {
+ assertTrue(
+ "Should have called processElement on " + invocation.name,
+ invocation.wasProcessElementInvoked);
+ }
+ }
+
+ private void checkInvokeStartBundleWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called startBundle on " + invocation.name,
+ invocation.wasStartBundleInvoked);
+ }
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(mockContext);
+ for (Invocations invocation : invocations) {
+ assertTrue(
+ "Should have called startBundle on " + invocation.name, invocation.wasStartBundleInvoked);
+ }
+ }
+
+ private void checkInvokeFinishBundleWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called finishBundle on " + invocation.name,
+ invocation.wasFinishBundleInvoked);
+ }
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(mockContext);
+ for (Invocations invocation : invocations) {
+ assertTrue(
+ "Should have called finishBundle on " + invocation.name,
+ invocation.wasFinishBundleInvoked);
+ }
+ }
+
+ private void checkInvokeSetupWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called setup on " + invocation.name, invocation.wasSetupInvoked);
+ }
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeSetup();
+ for (Invocations invocation : invocations) {
+ assertTrue("Should have called setup on " + invocation.name, invocation.wasSetupInvoked);
+ }
+ }
+
+ private void checkInvokeTeardownWorks(DoFn<String, String> fn, Invocations... invocations)
+ throws Exception {
+ assertTrue("Need at least one invocation to check", invocations.length >= 1);
+ for (Invocations invocation : invocations) {
+ assertFalse(
+ "Should not yet have called teardown on " + invocation.name,
+ invocation.wasTeardownInvoked);
+ }
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeTeardown();
+ for (Invocations invocation : invocations) {
+ assertTrue(
+ "Should have called teardown on " + invocation.name, invocation.wasTeardownInvoked);
+ }
+ }
+
+ @Test
+ public void testDoFnWithNoExtraContext() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ }
+ };
+
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+
+ checkInvokeProcessElementWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnInvokersReused() throws Exception {
+ // Ensures that we don't create a new Invoker class for every instance of the DoFn.
+ IdentityParent fn1 = new IdentityParent();
+ IdentityParent fn2 = new IdentityParent();
+ assertSame(
+ "Invoker classes should only be generated once for each type",
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn1).getClass(),
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass());
+ }
+
+ interface InterfaceWithProcessElement {
+ @DoFn.ProcessElement
+ void processElement(DoFn<String, String>.ProcessContext c);
+ }
+
+ interface LayersOfInterfaces extends InterfaceWithProcessElement {}
+
+ private class IdentityUsingInterfaceWithProcessElement extends DoFn<String, String>
+ implements LayersOfInterfaces {
+
+ private Invocations invocations = new Invocations("Named Class");
+
+ @Override
+ public void processElement(DoFn<String, String>.ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ }
+ }
+
+ @Test
+ public void testDoFnWithProcessElementInterface() throws Exception {
+ IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement();
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+ checkInvokeProcessElementWorks(fn, fn.invocations);
+ }
+
+ private class IdentityParent extends DoFn<String, String> {
+ protected Invocations parentInvocations = new Invocations("IdentityParent");
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ parentInvocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ }
+ }
+
+ private class IdentityChildWithoutOverride extends IdentityParent {}
+
+ private class IdentityChildWithOverride extends IdentityParent {
+ protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
+
+ @Override
+ public void process(DoFn<String, String>.ProcessContext c) {
+ super.process(c);
+ childInvocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ @Test
+ public void testDoFnWithMethodInSuperclass() throws Exception {
+ IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride();
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+ checkInvokeProcessElementWorks(fn, fn.parentInvocations);
+ }
+
+ @Test
+ public void testDoFnWithMethodInSubclass() throws Exception {
+ IdentityChildWithOverride fn = new IdentityChildWithOverride();
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+ checkInvokeProcessElementWorks(fn, fn.parentInvocations, fn.childInvocations);
+ }
+
+ @Test
+ public void testDoFnWithWindow() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow w) throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ assertSame(w, mockWindow);
+ }
+ };
+
+ assertTrue(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+
+ checkInvokeProcessElementWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnWithOutputReceiver() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ assertSame(o, mockOutputReceiver);
+ }
+ };
+
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+
+ checkInvokeProcessElementWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnWithInputProvider() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c, InputProvider<String> i) throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ assertSame(i, mockInputProvider);
+ }
+ };
+
+ assertFalse(
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .usesSingleWindow());
+
+ checkInvokeProcessElementWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnWithStartBundle() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+
+ @StartBundle
+ public void startBundle(Context c) {
+ invocations.wasStartBundleInvoked = true;
+ assertSame(c, mockContext);
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) {
+ invocations.wasFinishBundleInvoked = true;
+ assertSame(c, mockContext);
+ }
+ };
+
+ checkInvokeStartBundleWorks(fn, invocations);
+ checkInvokeFinishBundleWorks(fn, invocations);
+ }
+
+ @Test
+ public void testDoFnWithSetupTeardown() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFn<String, String> fn =
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+
+ @StartBundle
+ public void startBundle(Context c) {
+ invocations.wasStartBundleInvoked = true;
+ assertSame(c, mockContext);
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) {
+ invocations.wasFinishBundleInvoked = true;
+ assertSame(c, mockContext);
+ }
+
+ @Setup
+ public void before() {
+ invocations.wasSetupInvoked = true;
+ }
+
+ @Teardown
+ public void after() {
+ invocations.wasTeardownInvoked = true;
+ }
+ };
+
+ checkInvokeSetupWorks(fn, invocations);
+ checkInvokeTeardownWorks(fn, invocations);
+ }
+
+ private static class PrivateDoFnClass extends DoFn<String, String> {
+ final Invocations invocations = new Invocations(getClass().getName());
+
+ @ProcessElement
+ public void processThis(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ @Test
+ public void testLocalPrivateDoFnClass() throws Exception {
+ PrivateDoFnClass fn = new PrivateDoFnClass();
+ checkInvokeProcessElementWorks(fn, fn.invocations);
+ }
+
+ @Test
+ public void testStaticPackagePrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
+ checkInvokeProcessElementWorks(
+ DoFnInvokersTestHelper.newStaticPackagePrivateDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testInnerPackagePrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
+ checkInvokeProcessElementWorks(
+ new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testStaticPrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticPrivateDoFn");
+ checkInvokeProcessElementWorks(
+ DoFnInvokersTestHelper.newStaticPrivateDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testInnerPrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticInnerDoFn");
+ checkInvokeProcessElementWorks(
+ new DoFnInvokersTestHelper().newInnerPrivateDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
+ Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
+ checkInvokeProcessElementWorks(
+ new DoFnInvokersTestHelper().newInnerAnonymousDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
+ Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
+ checkInvokeProcessElementWorks(
+ DoFnInvokersTestHelper.newStaticAnonymousDoFn(invocations), invocations);
+ }
+
+ @Test
+ public void testProcessElementException() throws Exception {
+ DoFn<Integer, Integer> fn =
+ new DoFn<Integer, Integer>() {
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {
+ throw new IllegalArgumentException("bogus");
+ }
+ };
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectMessage("bogus");
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeProcessElement(null, null);
+ }
+
+ @Test
+ public void testStartBundleException() throws Exception {
+ DoFn<Integer, Integer> fn =
+ new DoFn<Integer, Integer>() {
+ @StartBundle
+ public void startBundle(@SuppressWarnings("unused") Context c) {
+ throw new IllegalArgumentException("bogus");
+ }
+
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+ };
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectMessage("bogus");
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(null);
+ }
+
+ @Test
+ public void testFinishBundleException() throws Exception {
+ DoFn<Integer, Integer> fn =
+ new DoFn<Integer, Integer>() {
+ @FinishBundle
+ public void finishBundle(@SuppressWarnings("unused") Context c) {
+ throw new IllegalArgumentException("bogus");
+ }
+
+ @ProcessElement
+ public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
+ };
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectMessage("bogus");
+ DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
new file mode 100644
index 0000000..7bfdddc
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest.Invocations;
+
+/**
+ * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access
+ * to DoFns in other packages.
+ */
+public class DoFnInvokersTestHelper {
+
+ private static class StaticPrivateDoFn extends DoFn<String, String> {
+ final Invocations invocations;
+
+ public StaticPrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ private class InnerPrivateDoFn extends DoFn<String, String> {
+ final Invocations invocations;
+
+ public InnerPrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ static class StaticPackagePrivateDoFn extends DoFn<String, String> {
+ final Invocations invocations;
+
+ public StaticPackagePrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ class InnerPackagePrivateDoFn extends DoFn<String, String> {
+ final Invocations invocations;
+
+ public InnerPackagePrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ public static DoFn<String, String> newStaticPackagePrivateDoFn(
+ Invocations invocations) {
+ return new StaticPackagePrivateDoFn(invocations);
+ }
+
+ public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
+ return new InnerPackagePrivateDoFn(invocations);
+ }
+
+ public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) {
+ return new StaticPrivateDoFn(invocations);
+ }
+
+ public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) {
+ return new InnerPrivateDoFn(invocations);
+ }
+
+ public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
+ return new DoFn<String, String>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ };
+ }
+
+ public static DoFn<String, String> newStaticAnonymousDoFn(
+ final Invocations invocations) {
+ return new DoFn<String, String>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ };
+ }
+}