You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 03:30:12 UTC
[2/5] incubator-beam git commit: Add OnTimerContext parameter support
to DoFnSignature
Add OnTimerContext parameter support to DoFnSignature
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42b506f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42b506f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42b506f0
Branch: refs/heads/master
Commit: 42b506f06dbd73e03a2cfad4e7677e9698b3c020
Parents: 3f8c807
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:18:18 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 7 19:22:43 2016 -0800
----------------------------------------------------------------------
.../reflect/ByteBuddyDoFnInvokerFactory.java | 6 ++
.../sdk/transforms/reflect/DoFnSignature.java | 26 +++++-
.../sdk/transforms/reflect/DoFnSignatures.java | 90 ++++++++++++++++----
.../DoFnSignaturesSplittableDoFnTest.java | 3 +-
.../transforms/reflect/DoFnSignaturesTest.java | 47 ++++++++++
5 files changed, 154 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 8750d64..3480603 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
@@ -554,6 +555,11 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
}
@Override
+ public StackManipulation dispatch(OnTimerContextParameter p) {
+ throw new UnsupportedOperationException("OnTimerContext is not yet supported.");
+ }
+
+ @Override
public StackManipulation dispatch(WindowParameter p) {
return new StackManipulation.Compound(
simpleExtraContextParameter(WINDOW_PARAMETER_METHOD),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 0750949..ccc9ac3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -175,6 +175,8 @@ public abstract class DoFnSignature {
return cases.dispatch((ContextParameter) this);
} else if (this instanceof ProcessContextParameter) {
return cases.dispatch((ProcessContextParameter) this);
+ } else if (this instanceof OnTimerContextParameter) {
+ return cases.dispatch((OnTimerContextParameter) this);
} else if (this instanceof WindowParameter) {
return cases.dispatch((WindowParameter) this);
} else if (this instanceof RestrictionTrackerParameter) {
@@ -200,6 +202,7 @@ public abstract class DoFnSignature {
public interface Cases<ResultT> {
ResultT dispatch(ContextParameter p);
ResultT dispatch(ProcessContextParameter p);
+ ResultT dispatch(OnTimerContextParameter p);
ResultT dispatch(WindowParameter p);
ResultT dispatch(InputProviderParameter p);
ResultT dispatch(OutputReceiverParameter p);
@@ -225,6 +228,11 @@ public abstract class DoFnSignature {
}
@Override
+ public ResultT dispatch(OnTimerContextParameter p) {
+ return dispatchDefault(p);
+ }
+
+ @Override
public ResultT dispatch(WindowParameter p) {
return dispatchDefault(p);
}
@@ -261,12 +269,14 @@ public abstract class DoFnSignature {
new AutoValue_DoFnSignature_Parameter_ContextParameter();
private static final ProcessContextParameter PROCESS_CONTEXT_PARAMETER =
new AutoValue_DoFnSignature_Parameter_ProcessContextParameter();
+ private static final OnTimerContextParameter ON_TIMER_CONTEXT_PARAMETER =
+ new AutoValue_DoFnSignature_Parameter_OnTimerContextParameter();
private static final InputProviderParameter INPUT_PROVIDER_PARAMETER =
new AutoValue_DoFnSignature_Parameter_InputProviderParameter();
private static final OutputReceiverParameter OUTPUT_RECEIVER_PARAMETER =
new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter();
- /** Returns a {@link ProcessContextParameter}. */
+ /** Returns a {@link ContextParameter}. */
public static ContextParameter context() {
return CONTEXT_PARAMETER;
}
@@ -276,6 +286,11 @@ public abstract class DoFnSignature {
return PROCESS_CONTEXT_PARAMETER;
}
+ /** Returns a {@link OnTimerContextParameter}. */
+ public static OnTimerContextParameter onTimerContext() {
+ return ON_TIMER_CONTEXT_PARAMETER;
+ }
+
/** Returns a {@link WindowParameter}. */
public static WindowParameter boundedWindow(TypeDescriptor<? extends BoundedWindow> windowT) {
return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT);
@@ -334,6 +349,15 @@ public abstract class DoFnSignature {
}
/**
+ * Descriptor for a {@link Parameter} of type {@link DoFn.OnTimerContext}.
+ *
+ * <p>All such descriptors are equal.
+ */
+ @AutoValue
+ public abstract static class OnTimerContextParameter extends Parameter {
+ OnTimerContextParameter() {}
+ }
+ /**
* Descriptor for a {@link Parameter} of type {@link BoundedWindow}.
*
* <p>All such descriptors are equal.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 83d67b7..e3ba966 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
@@ -74,6 +74,29 @@ public class DoFnSignatures {
private static final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
+ private static final Collection<Class<? extends Parameter>>
+ ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
+ ImmutableList.of(
+ Parameter.ProcessContextParameter.class,
+ Parameter.WindowParameter.class,
+ Parameter.TimerParameter.class,
+ Parameter.StateParameter.class,
+ Parameter.InputProviderParameter.class,
+ Parameter.OutputReceiverParameter.class);
+
+ private static final Collection<Class<? extends Parameter>>
+ ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
+ ImmutableList.of(
+ Parameter.ProcessContextParameter.class, Parameter.RestrictionTrackerParameter.class);
+
+ private static final Collection<Class<? extends Parameter>>
+ ALLOWED_ON_TIMER_PARAMETERS =
+ ImmutableList.of(
+ Parameter.OnTimerContextParameter.class,
+ Parameter.WindowParameter.class,
+ Parameter.TimerParameter.class,
+ Parameter.StateParameter.class);
+
/** @return the {@link DoFnSignature} for the given {@link DoFn} instance. */
public static <FnT extends DoFn<?, ?>> DoFnSignature signatureForDoFn(FnT fn) {
return getSignature(fn.getClass());
@@ -583,6 +606,18 @@ public class DoFnSignatures {
}
/**
+ * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.Context} given {@code
+ * InputT} and {@code OutputT}.
+ */
+ private static <InputT, OutputT>
+ TypeDescriptor<DoFn<InputT, OutputT>.OnTimerContext> doFnOnTimerContextTypeOf(
+ TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) {
+ return new TypeDescriptor<DoFn<InputT, OutputT>.OnTimerContext>() {}.where(
+ new TypeParameter<InputT>() {}, inputT)
+ .where(new TypeParameter<OutputT>() {}, outputT);
+ }
+
+ /**
* Generates a {@link TypeDescriptor} for {@code DoFn.InputProvider<InputT>} given {@code InputT}.
*/
private static <InputT> TypeDescriptor<DoFn.InputProvider<InputT>> inputProviderTypeOf(
@@ -621,7 +656,7 @@ public class DoFnSignatures {
List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
ErrorReporter onTimerErrors = errors.forMethod(DoFn.OnTimer.class, m);
for (int i = 0; i < params.length; ++i) {
- extraParameters.add(
+ Parameter parameter =
analyzeExtraParameter(
onTimerErrors,
fnContext,
@@ -633,7 +668,14 @@ public class DoFnSignatures {
fnClass.resolveType(params[i]),
Arrays.asList(m.getParameterAnnotations()[i])),
inputT,
- outputT));
+ outputT);
+
+ checkParameterOneOf(
+ errors,
+ parameter,
+ ALLOWED_ON_TIMER_PARAMETERS);
+
+ extraParameters.add(parameter);
}
return DoFnSignature.OnTimerMethod.create(m, timerId, windowT, extraParameters);
@@ -679,20 +721,15 @@ public class DoFnSignatures {
methodContext.addParameter(extraParam);
}
- // A splittable DoFn can not have any other extra context parameters.
+ // The allowed parameters depend on whether this DoFn is splittable
if (methodContext.hasRestrictionTrackerParameter()) {
- errors.checkArgument(
- Iterables.all(
- methodContext.getExtraParameters(),
- Predicates.or(
- Predicates.instanceOf(RestrictionTrackerParameter.class),
- Predicates.instanceOf(ProcessContextParameter.class))),
- "Splittable %s @%s must have only %s and %s parameters, but has: %s",
- DoFn.class.getSimpleName(),
- DoFn.ProcessElement.class.getSimpleName(),
- DoFn.ProcessContext.class.getSimpleName(),
- RestrictionTracker.class.getSimpleName(),
- methodContext.getExtraParameters());
+ for (Parameter parameter : methodContext.getExtraParameters()) {
+ checkParameterOneOf(errors, parameter, ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS);
+ }
+ } else {
+ for (Parameter parameter : methodContext.getExtraParameters()) {
+ checkParameterOneOf(errors, parameter, ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS);
+ }
}
return DoFnSignature.ProcessElementMethod.create(
@@ -703,6 +740,21 @@ public class DoFnSignatures {
DoFn.ProcessContinuation.class.equals(m.getReturnType()));
}
+ private static void checkParameterOneOf(
+ ErrorReporter errors,
+ Parameter parameter,
+ Collection<Class<? extends Parameter>> allowedParameterClasses) {
+
+ for (Class<? extends Parameter> paramClass : allowedParameterClasses) {
+ if (paramClass.isAssignableFrom(parameter.getClass())) {
+ return;
+ }
+ }
+
+ // If we get here, none matched
+ errors.throwIllegalArgument("Illegal parameter type: %s", parameter);
+ }
+
private static Parameter analyzeExtraParameter(
ErrorReporter methodErrors,
FnAnalysisContext fnContext,
@@ -714,6 +766,7 @@ public class DoFnSignatures {
TypeDescriptor<?> expectedProcessContextT = doFnProcessContextTypeOf(inputT, outputT);
TypeDescriptor<?> expectedContextT = doFnContextTypeOf(inputT, outputT);
+ TypeDescriptor<?> expectedOnTimerContextT = doFnOnTimerContextTypeOf(inputT, outputT);
TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT);
TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
@@ -732,6 +785,11 @@ public class DoFnSignatures {
"Must take %s as the Context argument",
formatType(expectedContextT));
return Parameter.context();
+ } else if (rawType.equals(DoFn.OnTimerContext.class)) {
+ methodErrors.checkArgument(paramT.equals(expectedOnTimerContextT),
+ "Must take %s as the OnTimerContext argument",
+ formatType(expectedOnTimerContextT));
+ return Parameter.onTimerContext();
} else if (BoundedWindow.class.isAssignableFrom(rawType)) {
methodErrors.checkArgument(
!methodContext.hasWindowParameter(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 91f2d1b..7b594c9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -90,7 +90,8 @@ public class DoFnSignaturesSplittableDoFnTest {
@Test
public void testSplittableProcessElementMustNotHaveOtherParams() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("must have only ProcessContext and RestrictionTracker parameters");
+ thrown.expectMessage("Illegal parameter");
+ thrown.expectMessage("BoundedWindow");
DoFnSignature.ProcessElementMethod signature =
analyzeProcessElementMethod(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 1381cd9..69d4058 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -442,6 +442,53 @@ public class DoFnSignaturesTest {
}
@Test
+ public void testSimpleTimerWithContext() throws Exception {
+ DoFnSignature sig =
+ DoFnSignatures.getSignature(
+ new DoFn<KV<String, Integer>, Long>() {
+ @TimerId("foo")
+ private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void foo(ProcessContext context) {}
+
+ @OnTimer("foo")
+ public void onFoo(OnTimerContext c) {}
+ }.getClass());
+
+ assertThat(sig.timerDeclarations().size(), equalTo(1));
+ DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+
+ assertThat(decl.id(), equalTo("foo"));
+ assertThat(decl.field().getName(), equalTo("bizzle"));
+
+ assertThat(
+ sig.onTimerMethods().get("foo").extraParameters().get(0),
+ equalTo((Parameter) Parameter.onTimerContext()));
+ }
+
+ @Test
+ public void testProcessElementWithOnTimerContextRejected() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ // The message should at least mention @ProcessElement and OnTimerContext
+ thrown.expectMessage("@" + DoFn.ProcessElement.class.getSimpleName());
+ thrown.expectMessage(DoFn.OnTimerContext.class.getSimpleName());
+
+ DoFnSignature sig =
+ DoFnSignatures.getSignature(
+ new DoFn<KV<String, Integer>, Long>() {
+ @TimerId("foo")
+ private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void foo(ProcessContext context, OnTimerContext bogus) {}
+
+ @OnTimer("foo")
+ public void onFoo() {}
+ }.getClass());
+ }
+
+ @Test
public void testSimpleTimerIdNamedDoFn() throws Exception {
class DoFnForTestSimpleTimerIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
@TimerId("foo")