You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/07 19:59:14 UTC
[24/50] incubator-beam git commit: Refactor and reuse parameter
analysis in DoFnSignatures
Refactor and reuse parameter analysis in DoFnSignatures
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bf6d92c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bf6d92c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bf6d92c
Branch: refs/heads/apex-runner
Commit: 8bf6d92cf35d11f4f3b02dae677a4fe778d34a61
Parents: 71fa7cd
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 31 21:30:40 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 3 21:32:53 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/reflect/DoFnSignature.java | 21 +-
.../sdk/transforms/reflect/DoFnSignatures.java | 585 ++++++++++++-------
.../DoFnSignaturesProcessElementTest.java | 18 +-
.../DoFnSignaturesSplittableDoFnTest.java | 1 -
.../transforms/reflect/DoFnSignaturesTest.java | 35 +-
.../reflect/DoFnSignaturesTestUtils.java | 5 +-
6 files changed, 446 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf6d92c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 431de02..7087efa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -126,12 +126,25 @@ public abstract class DoFnSignature {
abstract DoFnSignature build();
}
- /** A method delegated to a annotated method of an underlying {@link DoFn}. */
+ /** A method delegated to an annotated method of an underlying {@link DoFn}. */
public interface DoFnMethod {
/** The annotated method itself. */
Method targetMethod();
}
+ /**
+ * A method delegated to an annotated method of an underlying {@link DoFn} that accepts a dynamic
+ * list of parameters.
+ */
+ public interface MethodWithExtraParameters extends DoFnMethod {
+ /**
+ * Types of optional parameters of the annotated method, in the order they appear.
+ *
+ * <p>Validation that these are allowed is external to this class.
+ */
+ List<Parameter> extraParameters();
+ }
+
/** A descriptor for an optional parameter of the {@link DoFn.ProcessElement} method. */
public abstract static class Parameter {
@@ -331,12 +344,13 @@ public abstract class DoFnSignature {
/** Describes a {@link DoFn.ProcessElement} method. */
@AutoValue
- public abstract static class ProcessElementMethod implements DoFnMethod {
+ public abstract static class ProcessElementMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/** Types of optional parameters of the annotated method, in the order they appear. */
+ @Override
public abstract List<Parameter> extraParameters();
/** Concrete type of the {@link RestrictionTracker} parameter, if present. */
@@ -380,7 +394,7 @@ public abstract class DoFnSignature {
/** Describes a {@link DoFn.OnTimer} method. */
@AutoValue
- public abstract static class OnTimerMethod implements DoFnMethod {
+ public abstract static class OnTimerMethod implements MethodWithExtraParameters {
/** The id on the method's {@link DoFn.TimerId} annotation. */
public abstract String id();
@@ -390,6 +404,7 @@ public abstract class DoFnSignature {
public abstract Method targetMethod();
/** Types of optional parameters of the annotated method, in the order they appear. */
+ @Override
public abstract List<Parameter> extraParameters();
static OnTimerMethod create(Method targetMethod, String id, List<Parameter> extraParameters) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf6d92c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index c690ace..0475404 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect;
import static com.google.common.base.Preconditions.checkState;
+import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -41,9 +42,11 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -81,13 +84,134 @@ public class DoFnSignatures {
return signature;
}
+ /**
+ * The context for a {@link DoFn} class, for use in analysis.
+ *
+ * <p>It contains much of the information that eventually becomes part of the {@link
+ * DoFnSignature}, but in an intermediate state.
+ */
+ @VisibleForTesting
+ static class FnAnalysisContext {
+
+ private final Map<String, StateDeclaration> stateDeclarations = new HashMap<>();
+ private final Map<String, TimerDeclaration> timerDeclarations = new HashMap<>();
+
+ private FnAnalysisContext() {}
+
+ /** Create an empty context, with no declarations. */
+ public static FnAnalysisContext create() {
+ return new FnAnalysisContext();
+ }
+
+ /** State parameters declared in this context, keyed by {@link StateId}. Unmodifiable. */
+ public Map<String, StateDeclaration> getStateDeclarations() {
+ return Collections.unmodifiableMap(stateDeclarations);
+ }
+
+ /** Timer parameters declared in this context, keyed by {@link TimerId}. Unmodifiable. */
+ public Map<String, TimerDeclaration> getTimerDeclarations() {
+ return Collections.unmodifiableMap(timerDeclarations);
+ }
+
+ public void addStateDeclaration(StateDeclaration decl) {
+ stateDeclarations.put(decl.id(), decl);
+ }
+
+ public void addStateDeclarations(Iterable<StateDeclaration> decls) {
+ for (StateDeclaration decl : decls) {
+ addStateDeclaration(decl);
+ }
+ }
+
+ public void addTimerDeclaration(TimerDeclaration decl) {
+ timerDeclarations.put(decl.id(), decl);
+ }
+
+ public void addTimerDeclarations(Iterable<TimerDeclaration> decls) {
+ for (TimerDeclaration decl : decls) {
+ addTimerDeclaration(decl);
+ }
+ }
+ }
+
+ /**
+ * The context of analysis within a particular method.
+ *
+ * <p>It contains much of the information that eventually becomes part of the {@link
+ * DoFnSignature.MethodWithExtraParameters}, but in an intermediate state.
+ */
+ private static class MethodAnalysisContext {
+
+ private final Map<String, StateParameter> stateParameters = new HashMap<>();
+ private final Map<String, TimerParameter> timerParameters = new HashMap<>();
+ private final List<Parameter> extraParameters = new ArrayList<>();
+
+ private MethodAnalysisContext() {}
+
+ /** State parameters declared in this context, keyed by {@link StateId}. */
+ public Map<String, StateParameter> getStateParameters() {
+ return Collections.unmodifiableMap(stateParameters);
+ }
+
+ /** Timer parameters declared in this context, keyed by {@link TimerId}. */
+ public Map<String, TimerParameter> getTimerParameters() {
+ return Collections.unmodifiableMap(timerParameters);
+ }
+
+ /** Extra parameters in their entirety. Unmodifiable. */
+ public List<Parameter> getExtraParameters() {
+ return Collections.unmodifiableList(extraParameters);
+ }
+
+ /**
+ * Returns an {@link MethodAnalysisContext} like this one but including the provided {@link
+ * StateParameter}.
+ */
+ public void addParameter(Parameter param) {
+ extraParameters.add(param);
+
+ if (param instanceof StateParameter) {
+ StateParameter stateParameter = (StateParameter) param;
+ stateParameters.put(stateParameter.referent().id(), stateParameter);
+ }
+ if (param instanceof TimerParameter) {
+ TimerParameter timerParameter = (TimerParameter) param;
+ timerParameters.put(timerParameter.referent().id(), timerParameter);
+ }
+ }
+
+ /** Create an empty context, with no declarations. */
+ public static MethodAnalysisContext create() {
+ return new MethodAnalysisContext();
+ }
+ }
+
+ @AutoValue
+ abstract static class ParameterDescription {
+ public abstract Method getMethod();
+ public abstract int getIndex();
+ public abstract TypeDescriptor<?> getType();
+ public abstract List<Annotation> getAnnotations();
+
+ public static ParameterDescription of(
+ Method method, int index, TypeDescriptor<?> type, List<Annotation> annotations) {
+ return new AutoValue_DoFnSignatures_ParameterDescription(method, index, type, annotations);
+ }
+
+ public static ParameterDescription of(
+ Method method, int index, TypeDescriptor<?> type, Annotation[] annotations) {
+ return new AutoValue_DoFnSignatures_ParameterDescription(
+ method, index, type, Arrays.asList(annotations));
+ }
+ }
+
/** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */
private static DoFnSignature parseSignature(Class<? extends DoFn<?, ?>> fnClass) {
- DoFnSignature.Builder builder = DoFnSignature.builder();
+ DoFnSignature.Builder signatureBuilder = DoFnSignature.builder();
ErrorReporter errors = new ErrorReporter(null, fnClass.getName());
errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn");
- builder.setFnClass(fnClass);
+ signatureBuilder.setFnClass(fnClass);
TypeDescriptor<? extends DoFn<?, ?>> fnT = TypeDescriptor.of(fnClass);
@@ -106,11 +230,9 @@ public class DoFnSignatures {
// Find the state and timer declarations in advance of validating
// method parameter lists
- Map<String, StateDeclaration> stateDeclarations = analyzeStateDeclarations(errors, fnClass);
- builder.setStateDeclarations(stateDeclarations);
-
- Map<String, TimerDeclaration> timerDeclarations = analyzeTimerDeclarations(errors, fnClass);
- builder.setTimerDeclarations(timerDeclarations);
+ FnAnalysisContext fnContext = FnAnalysisContext.create();
+ fnContext.addStateDeclarations(analyzeStateDeclarations(errors, fnClass).values());
+ fnContext.addTimerDeclarations(analyzeTimerDeclarations(errors, fnClass).values());
Method processElementMethod =
findAnnotatedMethod(errors, DoFn.ProcessElement.class, fnClass, true);
@@ -135,12 +257,12 @@ public class DoFnSignatures {
for (Method onTimerMethod : onTimerMethods) {
String id = onTimerMethod.getAnnotation(DoFn.OnTimer.class).value();
errors.checkArgument(
- timerDeclarations.containsKey(id),
+ fnContext.getTimerDeclarations().containsKey(id),
"Callback %s is for for undeclared timer %s",
onTimerMethod,
id);
- TimerDeclaration timerDecl = timerDeclarations.get(id);
+ TimerDeclaration timerDecl = fnContext.getTimerDeclarations().get(id);
errors.checkArgument(
timerDecl.field().getDeclaringClass().equals(onTimerMethod.getDeclaringClass()),
"Callback %s is for timer %s declared in a different class %s."
@@ -149,13 +271,14 @@ public class DoFnSignatures {
id,
timerDecl.field().getDeclaringClass().getCanonicalName());
- onTimerMethodMap.put(id, OnTimerMethod.create(onTimerMethod, id, Collections.EMPTY_LIST));
+ onTimerMethodMap.put(
+ id, analyzeOnTimerMethod(errors, fnT, onTimerMethod, id, outputT, fnContext));
}
- builder.setOnTimerMethods(onTimerMethodMap);
+ signatureBuilder.setOnTimerMethods(onTimerMethodMap);
// Check the converse - that all timers have a callback. This could be relaxed to only
// those timers used in methods, once method parameter lists support timers.
- for (TimerDeclaration decl : timerDeclarations.values()) {
+ for (TimerDeclaration decl : fnContext.getTimerDeclarations().values()) {
errors.checkArgument(
onTimerMethodMap.containsKey(decl.id()),
"No callback registered via %s for timer %s",
@@ -172,30 +295,29 @@ public class DoFnSignatures {
processElementMethod,
inputT,
outputT,
- stateDeclarations,
- timerDeclarations);
- builder.setProcessElement(processElement);
+ fnContext);
+ signatureBuilder.setProcessElement(processElement);
if (startBundleMethod != null) {
ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod);
- builder.setStartBundle(
+ signatureBuilder.setStartBundle(
analyzeBundleMethod(startBundleErrors, fnT, startBundleMethod, inputT, outputT));
}
if (finishBundleMethod != null) {
ErrorReporter finishBundleErrors =
errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod);
- builder.setFinishBundle(
+ signatureBuilder.setFinishBundle(
analyzeBundleMethod(finishBundleErrors, fnT, finishBundleMethod, inputT, outputT));
}
if (setupMethod != null) {
- builder.setSetup(
+ signatureBuilder.setSetup(
analyzeLifecycleMethod(errors.forMethod(DoFn.Setup.class, setupMethod), setupMethod));
}
if (teardownMethod != null) {
- builder.setTeardown(
+ signatureBuilder.setTeardown(
analyzeLifecycleMethod(
errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
}
@@ -205,7 +327,7 @@ public class DoFnSignatures {
if (getInitialRestrictionMethod != null) {
getInitialRestrictionErrors =
errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod);
- builder.setGetInitialRestriction(
+ signatureBuilder.setGetInitialRestriction(
getInitialRestriction =
analyzeGetInitialRestrictionMethod(
getInitialRestrictionErrors, fnT, getInitialRestrictionMethod, inputT));
@@ -215,7 +337,7 @@ public class DoFnSignatures {
if (splitRestrictionMethod != null) {
ErrorReporter splitRestrictionErrors =
errors.forMethod(DoFn.SplitRestriction.class, splitRestrictionMethod);
- builder.setSplitRestriction(
+ signatureBuilder.setSplitRestriction(
splitRestriction =
analyzeSplitRestrictionMethod(
splitRestrictionErrors, fnT, splitRestrictionMethod, inputT));
@@ -225,7 +347,7 @@ public class DoFnSignatures {
if (getRestrictionCoderMethod != null) {
ErrorReporter getRestrictionCoderErrors =
errors.forMethod(DoFn.GetRestrictionCoder.class, getRestrictionCoderMethod);
- builder.setGetRestrictionCoder(
+ signatureBuilder.setGetRestrictionCoder(
getRestrictionCoder =
analyzeGetRestrictionCoderMethod(
getRestrictionCoderErrors, fnT, getRestrictionCoderMethod));
@@ -234,13 +356,16 @@ public class DoFnSignatures {
DoFnSignature.NewTrackerMethod newTracker = null;
if (newTrackerMethod != null) {
ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, newTrackerMethod);
- builder.setNewTracker(
+ signatureBuilder.setNewTracker(
newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnT, newTrackerMethod));
}
- builder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors));
+ signatureBuilder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors));
- DoFnSignature signature = builder.build();
+ signatureBuilder.setStateDeclarations(fnContext.getStateDeclarations());
+ signatureBuilder.setTimerDeclarations(fnContext.getTimerDeclarations());
+
+ DoFnSignature signature = signatureBuilder.build();
// Additional validation for splittable DoFn's.
if (processElement.isSplittable()) {
@@ -452,14 +577,49 @@ public class DoFnSignatures {
}
@VisibleForTesting
+ static DoFnSignature.OnTimerMethod analyzeOnTimerMethod(
+ ErrorReporter errors,
+ TypeDescriptor<? extends DoFn<?, ?>> fnClass,
+ Method m,
+ String timerId,
+ TypeDescriptor<?> outputT,
+ FnAnalysisContext fnContext) {
+ errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
+
+ Type[] params = m.getGenericParameterTypes();
+
+ MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+
+ List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
+ TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
+ ErrorReporter onTimerErrors = errors.forMethod(DoFn.OnTimer.class, m);
+ for (int i = 0; i < params.length; ++i) {
+ extraParameters.add(
+ analyzeExtraParameter(
+ onTimerErrors,
+ fnContext,
+ methodContext,
+ fnClass,
+ ParameterDescription.of(
+ m,
+ i,
+ fnClass.resolveType(params[i]),
+ Arrays.asList(m.getParameterAnnotations()[i])),
+ null /* restriction type not applicable */,
+ expectedOutputReceiverT));
+ }
+
+ return DoFnSignature.OnTimerMethod.create(m, timerId, extraParameters);
+ }
+
+ @VisibleForTesting
static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
ErrorReporter errors,
TypeDescriptor<? extends DoFn<?, ?>> fnClass,
Method m,
TypeDescriptor<?> inputT,
TypeDescriptor<?> outputT,
- Map<String, StateDeclaration> stateDeclarations,
- Map<String, TimerDeclaration> timerDeclarations) {
+ FnAnalysisContext fnContext) {
errors.checkArgument(
void.class.equals(m.getReturnType())
|| DoFn.ProcessContinuation.class.equals(m.getReturnType()),
@@ -468,6 +628,8 @@ public class DoFnSignatures {
TypeDescriptor<?> processContextT = doFnProcessContextTypeOf(inputT, outputT);
+ MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+
Type[] params = m.getGenericParameterTypes();
TypeDescriptor<?> contextT = null;
if (params.length > 0) {
@@ -478,192 +640,211 @@ public class DoFnSignatures {
"Must take %s as the first argument",
formatType(processContextT));
- List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
- Map<String, DoFnSignature.Parameter> stateParameters = new HashMap<>();
- Map<String, DoFnSignature.Parameter> timerParameters = new HashMap<>();
- TypeDescriptor<?> trackerT = null;
-
+ TypeDescriptor<?> trackerT = getTrackerType(fnClass, m);
TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT);
TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
for (int i = 1; i < params.length; ++i) {
- TypeDescriptor<?> paramT = fnClass.resolveType(params[i]);
- Class<?> rawType = paramT.getRawType();
- if (rawType.equals(BoundedWindow.class)) {
- errors.checkArgument(
- !extraParameters.contains(DoFnSignature.Parameter.boundedWindow()),
- "Multiple %s parameters",
- BoundedWindow.class.getSimpleName());
- extraParameters.add(DoFnSignature.Parameter.boundedWindow());
- } else if (rawType.equals(DoFn.InputProvider.class)) {
- errors.checkArgument(
- !extraParameters.contains(DoFnSignature.Parameter.inputProvider()),
- "Multiple %s parameters",
- DoFn.InputProvider.class.getSimpleName());
- errors.checkArgument(
- paramT.equals(expectedInputProviderT),
- "Wrong type of %s parameter: %s, should be %s",
- DoFn.InputProvider.class.getSimpleName(),
- formatType(paramT),
- formatType(expectedInputProviderT));
- extraParameters.add(DoFnSignature.Parameter.inputProvider());
- } else if (rawType.equals(DoFn.OutputReceiver.class)) {
- errors.checkArgument(
- !extraParameters.contains(DoFnSignature.Parameter.outputReceiver()),
- "Multiple %s parameters",
- DoFn.OutputReceiver.class.getSimpleName());
- errors.checkArgument(
- paramT.equals(expectedOutputReceiverT),
- "Wrong type of %s parameter: %s, should be %s",
- DoFn.OutputReceiver.class.getSimpleName(),
- formatType(paramT),
- formatType(expectedOutputReceiverT));
- extraParameters.add(DoFnSignature.Parameter.outputReceiver());
- } else if (Timer.class.equals(rawType)) {
- // m.getParameters() is not available until Java 8
- Annotation[] annotations = m.getParameterAnnotations()[i];
- String id = null;
- for (Annotation anno : annotations) {
- if (anno.annotationType().equals(DoFn.TimerId.class)) {
- id = ((DoFn.TimerId) anno).value();
- break;
- }
- }
- errors.checkArgument(
- id != null,
- "%s parameter of type %s at index %s missing %s annotation",
- fnClass.getRawType().getName(),
- params[i],
- i,
- DoFn.TimerId.class.getSimpleName());
- errors.checkArgument(
- !timerParameters.containsKey(id),
- "%s parameter of type %s at index %s duplicates %s(\"%s\") on other parameter",
- fnClass.getRawType().getName(),
- params[i],
- i,
- DoFn.TimerId.class.getSimpleName(),
- id);
-
- TimerDeclaration timerDecl = timerDeclarations.get(id);
- errors.checkArgument(
- timerDecl != null,
- "%s parameter of type %s at index %s references undeclared %s \"%s\"",
- fnClass.getRawType().getName(),
- params[i],
- i,
- TimerId.class.getSimpleName(),
- id);
+ Parameter extraParam =
+ analyzeExtraParameter(
+ errors.forMethod(DoFn.ProcessElement.class, m),
+ fnContext,
+ methodContext,
+ fnClass,
+ ParameterDescription.of(
+ m,
+ i,
+ fnClass.resolveType(params[i]),
+ Arrays.asList(m.getParameterAnnotations()[i])),
+ expectedInputProviderT,
+ expectedOutputReceiverT);
+
+ methodContext.addParameter(extraParam);
+ }
- errors.checkArgument(
- timerDecl.field().getDeclaringClass().equals(m.getDeclaringClass()),
- "Method %s has %s parameter at index %s for timer %s"
- + " declared in a different class %s."
- + " Timers may be referenced only in the lexical scope where they are declared.",
- m,
- Timer.class.getSimpleName(),
- i,
- id,
- timerDecl.field().getDeclaringClass().getName());
+ // A splittable DoFn can not have any other extra context parameters.
+ if (methodContext.getExtraParameters().contains(DoFnSignature.Parameter.restrictionTracker())) {
+ errors.checkArgument(
+ methodContext.getExtraParameters().size() == 1,
+ "Splittable DoFn must not have any extra arguments, but has: %s",
+ trackerT,
+ methodContext.getExtraParameters());
+ }
- DoFnSignature.Parameter.TimerParameter timerParameter = Parameter.timerParameter(timerDecl);
- timerParameters.put(id, timerParameter);
- extraParameters.add(timerParameter);
+ return DoFnSignature.ProcessElementMethod.create(
+ m,
+ methodContext.getExtraParameters(),
+ trackerT,
+ DoFn.ProcessContinuation.class.equals(m.getReturnType()));
+ }
- } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
- errors.checkArgument(
- !extraParameters.contains(DoFnSignature.Parameter.restrictionTracker()),
- "Multiple %s parameters",
- RestrictionTracker.class.getSimpleName());
- extraParameters.add(DoFnSignature.Parameter.restrictionTracker());
- trackerT = paramT;
- } else if (State.class.isAssignableFrom(rawType)) {
- // m.getParameters() is not available until Java 8
- Annotation[] annotations = m.getParameterAnnotations()[i];
- String id = null;
- for (Annotation anno : annotations) {
- if (anno.annotationType().equals(DoFn.StateId.class)) {
- id = ((DoFn.StateId) anno).value();
- break;
- }
- }
- errors.checkArgument(
- id != null,
- "%s parameter of type %s at index %s missing %s annotation",
- fnClass.getRawType().getName(),
- params[i],
- i,
- DoFn.StateId.class.getSimpleName());
+ private static Parameter analyzeExtraParameter(
+ ErrorReporter methodErrors,
+ FnAnalysisContext fnContext,
+ MethodAnalysisContext methodContext,
+ TypeDescriptor<? extends DoFn<?, ?>> fnClass,
+ ParameterDescription param,
+ TypeDescriptor<?> expectedInputProviderT,
+ TypeDescriptor<?> expectedOutputReceiverT) {
+ TypeDescriptor<?> paramT = param.getType();
+ Class<?> rawType = paramT.getRawType();
+
+ ErrorReporter paramErrors = methodErrors.forParameter(param);
+
+ if (rawType.equals(BoundedWindow.class)) {
+ methodErrors.checkArgument(
+ !methodContext.getExtraParameters().contains(Parameter.boundedWindow()),
+ "Multiple %s parameters",
+ BoundedWindow.class.getSimpleName());
+ return Parameter.boundedWindow();
+ } else if (rawType.equals(DoFn.InputProvider.class)) {
+ methodErrors.checkArgument(
+ !methodContext.getExtraParameters().contains(Parameter.inputProvider()),
+ "Multiple %s parameters",
+ DoFn.InputProvider.class.getSimpleName());
+ paramErrors.checkArgument(
+ paramT.equals(expectedInputProviderT),
+ "%s is for %s when it should be %s",
+ DoFn.InputProvider.class.getSimpleName(),
+ formatType(paramT),
+ formatType(expectedInputProviderT));
+ return Parameter.inputProvider();
+ } else if (rawType.equals(DoFn.OutputReceiver.class)) {
+ methodErrors.checkArgument(
+ !methodContext.getExtraParameters().contains(Parameter.outputReceiver()),
+ "Multiple %s parameters",
+ DoFn.OutputReceiver.class.getSimpleName());
+ paramErrors.checkArgument(
+ paramT.equals(expectedOutputReceiverT),
+ "%s is for %s when it should be %s",
+ DoFn.OutputReceiver.class.getSimpleName(),
+ formatType(paramT),
+ formatType(expectedOutputReceiverT));
+ return Parameter.outputReceiver();
+
+ } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
+ methodErrors.checkArgument(
+ !methodContext.getExtraParameters().contains(Parameter.restrictionTracker()),
+ "Multiple %s parameters",
+ RestrictionTracker.class.getSimpleName());
+ return Parameter.restrictionTracker();
+
+ } else if (rawType.equals(Timer.class)) {
+ // m.getParameters() is not available until Java 8
+ String id = getTimerId(param.getAnnotations());
+
+ paramErrors.checkArgument(
+ id != null,
+ "%s missing %s annotation",
+ Timer.class.getSimpleName(),
+ TimerId.class.getSimpleName());
+
+ paramErrors.checkArgument(
+ !methodContext.getTimerParameters().containsKey(id),
+ "duplicate %s: \"%s\"",
+ TimerId.class.getSimpleName(),
+ id);
- errors.checkArgument(
- !stateParameters.containsKey(id),
- "%s parameter of type %s at index %s duplicates %s(\"%s\") on other parameter",
- fnClass.getRawType().getName(),
- params[i],
- i,
- DoFn.StateId.class.getSimpleName(),
- id);
+ TimerDeclaration timerDecl = fnContext.getTimerDeclarations().get(id);
+ paramErrors.checkArgument(
+ timerDecl != null,
+ "reference to undeclared %s: \"%s\"",
+ TimerId.class.getSimpleName(),
+ id);
- // By static typing this is already a well-formed State subclass
- TypeDescriptor<? extends State> stateType =
- (TypeDescriptor<? extends State>)
- TypeDescriptor.of(fnClass.getType())
- .resolveType(params[i]);
+ paramErrors.checkArgument(
+ timerDecl.field().getDeclaringClass().equals(param.getMethod().getDeclaringClass()),
+ "%s %s declared in a different class %s."
+ + " Timers may be referenced only in the lexical scope where they are declared.",
+ TimerId.class.getSimpleName(),
+ id,
+ timerDecl.field().getDeclaringClass().getName());
+
+ return Parameter.timerParameter(timerDecl);
+
+ } else if (State.class.isAssignableFrom(rawType)) {
+ // m.getParameters() is not available until Java 8
+ String id = getStateId(param.getAnnotations());
+ paramErrors.checkArgument(
+ id != null,
+ "missing %s annotation",
+ DoFn.StateId.class.getSimpleName());
+
+ paramErrors.checkArgument(
+ !methodContext.getStateParameters().containsKey(id),
+ "duplicate %s: \"%s\"",
+ DoFn.StateId.class.getSimpleName(),
+ id);
- StateDeclaration stateDecl = stateDeclarations.get(id);
- errors.checkArgument(
- stateDecl != null,
- "%s parameter of type %s at index %s references undeclared %s \"%s\"",
- fnClass.getRawType().getName(),
- params[i],
- i,
- DoFn.StateId.class.getSimpleName(),
- id);
+ // By static typing this is already a well-formed State subclass
+ TypeDescriptor<? extends State> stateType = (TypeDescriptor<? extends State>) param.getType();
- errors.checkArgument(
- stateDecl.stateType().equals(stateType),
- "%s parameter at index %s has type %s but is a reference to StateId %s of type %s",
- fnClass.getRawType().getName(),
- i,
- params[i],
- id,
- stateDecl.stateType());
+ StateDeclaration stateDecl = fnContext.getStateDeclarations().get(id);
+ paramErrors.checkArgument(
+ stateDecl != null,
+ "reference to undeclared %s: \"%s\"",
+ DoFn.StateId.class.getSimpleName(),
+ id);
- errors.checkArgument(
- stateDecl.field().getDeclaringClass().equals(m.getDeclaringClass()),
- "Method %s has State parameter at index %s for state %s"
- + " declared in a different class %s."
- + " State may be referenced only in the class where it is declared.",
- m,
- i,
- id,
- stateDecl.field().getDeclaringClass().getName());
-
- DoFnSignature.Parameter.StateParameter stateParameter = Parameter.stateParameter(stateDecl);
- stateParameters.put(id, stateParameter);
- extraParameters.add(stateParameter);
- } else {
- List<String> allowedParamTypes =
- Arrays.asList(
- formatType(new TypeDescriptor<BoundedWindow>() {}),
- formatType(new TypeDescriptor<RestrictionTracker<?>>() {}));
- errors.throwIllegalArgument(
- "%s is not a valid context parameter. Should be one of %s",
- formatType(paramT), allowedParamTypes);
+ paramErrors.checkArgument(
+ stateDecl.stateType().equals(stateType),
+ "reference to %s %s with different type %s",
+ StateId.class.getSimpleName(),
+ id,
+ stateDecl.stateType());
+
+ paramErrors.checkArgument(
+ stateDecl.field().getDeclaringClass().equals(param.getMethod().getDeclaringClass()),
+ "%s %s declared in a different class %s."
+ + " State may be referenced only in the class where it is declared.",
+ StateId.class.getSimpleName(),
+ id,
+ stateDecl.field().getDeclaringClass().getName());
+
+ return Parameter.stateParameter(stateDecl);
+ } else {
+ List<String> allowedParamTypes =
+ Arrays.asList(
+ formatType(new TypeDescriptor<BoundedWindow>() {}),
+ formatType(new TypeDescriptor<RestrictionTracker<?>>() {}));
+ paramErrors.throwIllegalArgument(
+ "%s is not a valid context parameter. Should be one of %s",
+ formatType(paramT), allowedParamTypes);
+ // Unreachable
+ return null;
+ }
+ }
+
+ @Nullable
+ private static String getTimerId(List<Annotation> annotations) {
+ for (Annotation anno : annotations) {
+ if (anno.annotationType().equals(DoFn.TimerId.class)) {
+ return ((DoFn.TimerId) anno).value();
}
}
+ return null;
+ }
- // A splittable DoFn can not have any other extra context parameters.
- if (extraParameters.contains(DoFnSignature.Parameter.restrictionTracker())) {
- errors.checkArgument(
- extraParameters.size() == 1,
- "Splittable DoFn must not have any extra arguments apart from BoundedWindow, but has: %s",
- trackerT,
- extraParameters);
+ @Nullable
+ private static String getStateId(List<Annotation> annotations) {
+ for (Annotation anno : annotations) {
+ if (anno.annotationType().equals(DoFn.StateId.class)) {
+ return ((DoFn.StateId) anno).value();
+ }
}
+ return null;
+ }
- return DoFnSignature.ProcessElementMethod.create(
- m, extraParameters, trackerT, DoFn.ProcessContinuation.class.equals(m.getReturnType()));
+ @Nullable
+ private static TypeDescriptor<?> getTrackerType(TypeDescriptor<?> fnClass, Method method) {
+ Type[] params = method.getGenericParameterTypes();
+ for (int i = 0; i < params.length; i++) {
+ TypeDescriptor<?> paramT = fnClass.resolveType(params[i]);
+ if (RestrictionTracker.class.isAssignableFrom(paramT.getRawType())) {
+ return paramT;
+ }
+ }
+ return null;
}
@VisibleForTesting
@@ -905,7 +1086,7 @@ public class DoFnSignatures {
return matches;
}
- private static ImmutableMap<String, DoFnSignature.StateDeclaration> analyzeStateDeclarations(
+ private static Map<String, DoFnSignature.StateDeclaration> analyzeStateDeclarations(
ErrorReporter errors,
Class<?> fnClazz) {
@@ -1015,6 +1196,14 @@ public class DoFnSignatures {
annotation.getSimpleName(), (method == null) ? "(absent)" : format(method)));
}
+ ErrorReporter forParameter(ParameterDescription param) {
+ return new ErrorReporter(
+ this,
+ String.format(
+ "parameter of type %s at index %s",
+ param.getType(), param.getIndex()));
+ }
+
void throwIllegalArgument(String message, Object... args) {
throw new IllegalArgumentException(label + ": " + String.format(message, args));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf6d92c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index 329a099..6cbc95e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -96,9 +96,9 @@ public class DoFnSignaturesProcessElementTest {
@Test
public void testBadGenericsTwoArgs() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(
- "Wrong type of OutputReceiver parameter: "
- + "OutputReceiver<Integer>, should be OutputReceiver<String>");
+ thrown.expectMessage("OutputReceiver<Integer>");
+ thrown.expectMessage("should be");
+ thrown.expectMessage("OutputReceiver<String>");
analyzeProcessElementMethod(
new AnonymousMethod() {
@@ -112,9 +112,9 @@ public class DoFnSignaturesProcessElementTest {
@Test
public void testBadGenericWildCards() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(
- "Wrong type of OutputReceiver parameter: "
- + "OutputReceiver<? super Integer>, should be OutputReceiver<String>");
+ thrown.expectMessage("OutputReceiver<? super Integer>");
+ thrown.expectMessage("should be");
+ thrown.expectMessage("OutputReceiver<String>");
analyzeProcessElementMethod(
new AnonymousMethod() {
@@ -137,9 +137,9 @@ public class DoFnSignaturesProcessElementTest {
@Test
public void testBadTypeVariables() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(
- "Wrong type of OutputReceiver parameter: "
- + "OutputReceiver<InputT>, should be OutputReceiver<OutputT>");
+ thrown.expectMessage("OutputReceiver<InputT>");
+ thrown.expectMessage("should be");
+ thrown.expectMessage("OutputReceiver<OutputT>");
DoFnSignatures.INSTANCE.getSignature(BadTypeVariables.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf6d92c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 573701b..0751b59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -88,7 +88,6 @@ public class DoFnSignaturesSplittableDoFnTest {
public void testSplittableProcessElementMustNotHaveOtherParams() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("must not have any extra arguments");
- thrown.expectMessage("BoundedWindow");
DoFnSignature.ProcessElementMethod signature =
analyzeProcessElementMethod(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf6d92c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 52ecb2a..4187e0a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.err
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -29,11 +30,11 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
@@ -249,7 +250,7 @@ public class DoFnSignaturesTest {
@Test
public void testTimerParameterDuplicate() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("duplicates");
+ thrown.expectMessage("duplicate");
thrown.expectMessage("my-id");
thrown.expectMessage("myProcessElement");
thrown.expectMessage("index 2");
@@ -291,6 +292,28 @@ public class DoFnSignaturesTest {
}
@Test
+ public void testWindowParamOnTimer() throws Exception {
+ final String timerId = "some-timer-id";
+
+ DoFnSignature sig =
+ DoFnSignatures.INSTANCE.getSignature(new DoFn<String, String>() {
+ @TimerId(timerId)
+ private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnTimer(timerId)
+ public void onTimer(BoundedWindow w) {}
+ }.getClass());
+
+ assertThat(sig.onTimerMethods().get(timerId).extraParameters().size(), equalTo(1));
+ assertThat(
+ sig.onTimerMethods().get(timerId).extraParameters().get(0),
+ instanceOf(DoFnSignature.Parameter.BoundedWindowParameter.class));
+ }
+
+ @Test
public void testDeclAndUsageOfTimerInSuperclass() throws Exception {
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(new DoFnOverridingAbstractTimerUse().getClass());
@@ -525,7 +548,7 @@ public class DoFnSignaturesTest {
@Test
public void testStateParameterDuplicate() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("duplicates");
+ thrown.expectMessage("duplicate");
thrown.expectMessage("my-id");
thrown.expectMessage("myProcessElement");
thrown.expectMessage("index 2");
@@ -549,7 +572,8 @@ public class DoFnSignaturesTest {
public void testStateParameterWrongStateType() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("WatermarkHoldState");
- thrown.expectMessage("but is a reference to");
+ thrown.expectMessage("reference to");
+ thrown.expectMessage("different type");
thrown.expectMessage("ValueState");
thrown.expectMessage("my-id");
thrown.expectMessage("myProcessElement");
@@ -572,7 +596,8 @@ public class DoFnSignaturesTest {
public void testStateParameterWrongGenericType() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("ValueState<java.lang.String>");
- thrown.expectMessage("but is a reference to");
+ thrown.expectMessage("reference to");
+ thrown.expectMessage("different type");
thrown.expectMessage("ValueState<java.lang.Integer>");
thrown.expectMessage("my-id");
thrown.expectMessage("myProcessElement");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf6d92c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
index 49e2ba7..b7d137a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.transforms.reflect;
import java.lang.reflect.Method;
-import java.util.Collections;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures.FnAnalysisContext;
import org.apache.beam.sdk.values.TypeDescriptor;
/** Utilities for use in {@link DoFnSignatures} tests. */
@@ -61,7 +61,6 @@ class DoFnSignaturesTestUtils {
method.getMethod(),
TypeDescriptor.of(Integer.class),
TypeDescriptor.of(String.class),
- Collections.EMPTY_MAP,
- Collections.EMPTY_MAP);
+ FnAnalysisContext.create());
}
}