You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 20:32:28 UTC
[1/2] incubator-beam git commit: Add analysis of Timer parameters to
DoFn.ProcessElement
Repository: incubator-beam
Updated Branches:
refs/heads/master a69f888e8 -> 1b47a2188
Add analysis of Timer parameters to DoFn.ProcessElement
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/db66cdbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/db66cdbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/db66cdbe
Branch: refs/heads/master
Commit: db66cdbee77320e7c72c43a5f8dd9e640b61e6bc
Parents: 472cf0e
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 15:02:36 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:33:19 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/reflect/DoFnSignature.java | 24 ++
.../sdk/transforms/reflect/DoFnSignatures.java | 67 ++++-
.../transforms/reflect/DoFnSignaturesTest.java | 283 ++++++++++++++-----
.../reflect/DoFnSignaturesTestUtils.java | 1 +
4 files changed, 308 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db66cdbe/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 1dc1fe3..6b9e566 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn.InputProvider;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.DoFn.StateId;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BoundedWindowParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -155,6 +156,8 @@ public abstract class DoFnSignature {
return cases.dispatch((OutputReceiverParameter) this);
} else if (this instanceof StateParameter) {
return cases.dispatch((StateParameter) this);
+ } else if (this instanceof TimerParameter) {
+ return cases.dispatch((TimerParameter) this);
} else {
throw new IllegalStateException(
String.format("Attempt to case match on unknown %s subclass %s",
@@ -171,6 +174,7 @@ public abstract class DoFnSignature {
ResultT dispatch(OutputReceiverParameter p);
ResultT dispatch(RestrictionTrackerParameter p);
ResultT dispatch(StateParameter p);
+ ResultT dispatch(TimerParameter p);
/**
* A base class for a visitor with a default method for cases it is not interested in.
@@ -203,6 +207,11 @@ public abstract class DoFnSignature {
public ResultT dispatch(StateParameter p) {
return dispatchDefault(p);
}
+
+ @Override
+ public ResultT dispatch(TimerParameter p) {
+ return dispatchDefault(p);
+ }
}
}
@@ -251,6 +260,10 @@ public abstract class DoFnSignature {
return new AutoValue_DoFnSignature_Parameter_StateParameter(decl);
}
+ public static TimerParameter timerParameter(TimerDeclaration decl) {
+ return new AutoValue_DoFnSignature_Parameter_TimerParameter(decl);
+ }
+
/**
* Descriptor for a {@link Parameter} of type {@link BoundedWindow}.
*
@@ -304,6 +317,17 @@ public abstract class DoFnSignature {
StateParameter() {}
public abstract StateDeclaration referent();
}
+
+ /**
+ * Descriptor for a {@link Parameter} of type {@link Timer}, with an id indicated by
+ * its {@link TimerId} annotation.
+ */
+ @AutoValue
+ public abstract static class TimerParameter extends Parameter {
+ // Package visible for AutoValue
+ TimerParameter() {}
+ public abstract TimerDeclaration referent();
+ }
}
/** Describes a {@link DoFn.ProcessElement} method. */
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db66cdbe/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 038b55d..ea1203a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -43,12 +43,14 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.util.state.State;
@@ -166,8 +168,13 @@ public class DoFnSignatures {
errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
DoFnSignature.ProcessElementMethod processElement =
analyzeProcessElementMethod(
- processElementErrors, fnToken, processElementMethod, inputT, outputT,
- stateDeclarations);
+ processElementErrors,
+ fnToken,
+ processElementMethod,
+ inputT,
+ outputT,
+ stateDeclarations,
+ timerDeclarations);
builder.setProcessElement(processElement);
if (startBundleMethod != null) {
@@ -447,7 +454,8 @@ public class DoFnSignatures {
Method m,
TypeToken<?> inputT,
TypeToken<?> outputT,
- Map<String, StateDeclaration> stateDeclarations) {
+ Map<String, StateDeclaration> stateDeclarations,
+ Map<String, TimerDeclaration> timerDeclarations) {
errors.checkArgument(
void.class.equals(m.getReturnType())
|| DoFn.ProcessContinuation.class.equals(m.getReturnType()),
@@ -468,6 +476,7 @@ public class DoFnSignatures {
List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
Map<String, DoFnSignature.Parameter> stateParameters = new HashMap<>();
+ Map<String, DoFnSignature.Parameter> timerParameters = new HashMap<>();
TypeToken<?> trackerT = null;
TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
@@ -505,6 +514,58 @@ public class DoFnSignatures {
formatType(paramT),
formatType(expectedOutputReceiverT));
extraParameters.add(DoFnSignature.Parameter.outputReceiver());
+ } else if (Timer.class.equals(rawType)) {
+ // m.getParameters() is not available until Java 8
+ Annotation[] annotations = m.getParameterAnnotations()[i];
+ String id = null;
+ for (Annotation anno : annotations) {
+ if (anno.annotationType().equals(DoFn.TimerId.class)) {
+ id = ((DoFn.TimerId) anno).value();
+ break;
+ }
+ }
+ errors.checkArgument(
+ id != null,
+ "%s parameter of type %s at index %s missing %s annotation",
+ fnClass.getRawType().getName(),
+ params[i],
+ i,
+ DoFn.TimerId.class.getSimpleName());
+
+ errors.checkArgument(
+ !timerParameters.containsKey(id),
+ "%s parameter of type %s at index %s duplicates %s(\"%s\") on other parameter",
+ fnClass.getRawType().getName(),
+ params[i],
+ i,
+ DoFn.TimerId.class.getSimpleName(),
+ id);
+
+ TimerDeclaration timerDecl = timerDeclarations.get(id);
+ errors.checkArgument(
+ timerDecl != null,
+ "%s parameter of type %s at index %s references undeclared %s \"%s\"",
+ fnClass.getRawType().getName(),
+ params[i],
+ i,
+ TimerId.class.getSimpleName(),
+ id);
+
+ errors.checkArgument(
+ timerDecl.field().getDeclaringClass().equals(m.getDeclaringClass()),
+ "Method %s has %s parameter at index %s for timer %s"
+ + " declared in a different class %s."
+ + " Timers may be referenced only in the lexical scope where they are declared.",
+ m,
+ Timer.class.getSimpleName(),
+ i,
+ id,
+ timerDecl.field().getDeclaringClass().getName());
+
+ DoFnSignature.Parameter.TimerParameter timerParameter = Parameter.timerParameter(timerDecl);
+ timerParameters.put(id, timerParameter);
+ extraParameters.add(timerParameter);
+
} else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
errors.checkArgument(
!extraParameters.contains(DoFnSignature.Parameter.restrictionTracker()),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db66cdbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index ad58e80..fe88c3b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -18,7 +18,10 @@
package org.apache.beam.sdk.transforms.reflect;
import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.errors;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -27,10 +30,13 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.state.StateSpec;
@@ -39,6 +45,7 @@ import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
@@ -145,6 +152,7 @@ public class DoFnSignaturesTest {
thrown.expectMessage("TimerId");
thrown.expectMessage("TimerSpec");
thrown.expectMessage("bizzle");
+ thrown.expectMessage(not(mentionsState()));
DoFnSignatures.INSTANCE.getSignature(
new DoFn<String, String>() {
@TimerId("foo")
@@ -159,11 +167,13 @@ public class DoFnSignaturesTest {
public void testTimerIdNoCallback() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("No callback registered");
- thrown.expectMessage("my-timer-id");
+ thrown.expectMessage("my-id");
+ thrown.expectMessage(not(mentionsState()));
+ thrown.expectMessage(mentionsTimers());
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
- @TimerId("my-timer-id")
+ @TimerId("my-id")
private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
@@ -176,13 +186,15 @@ public class DoFnSignaturesTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Callback");
thrown.expectMessage("undeclared timer");
- thrown.expectMessage("onTimerFoo");
- thrown.expectMessage("my-timer-id");
+ thrown.expectMessage("onFoo");
+ thrown.expectMessage("my-id");
+ thrown.expectMessage(not(mentionsState()));
+ thrown.expectMessage(mentionsTimers());
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
- @OnTimer("my-timer-id")
- public void onTimerFoo() {}
+ @OnTimer("my-id")
+ public void onFoo() {}
@ProcessElement
public void foo(ProcessContext context) {}
@@ -191,18 +203,72 @@ public class DoFnSignaturesTest {
@Test
public void testOnTimerDeclaredInSuperclass() throws Exception {
+ class DoFnDeclaringTimerAndProcessElement extends DoFn<KV<String, Integer>, Long> {
+ public static final String TIMER_ID = "my-timer-id";
+
+ @TimerId(TIMER_ID)
+ private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void foo(ProcessContext context) {}
+ }
+
+ DoFnDeclaringTimerAndProcessElement fn =
+ new DoFnDeclaringTimerAndProcessElement() {
+ @OnTimer(DoFnDeclaringTimerAndProcessElement.TIMER_ID)
+ public void onTimerFoo() {}
+ };
+
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Callback");
thrown.expectMessage("declared in a different class");
- thrown.expectMessage("my-timer-id");
+ thrown.expectMessage(DoFnDeclaringTimerAndProcessElement.TIMER_ID);
+ thrown.expectMessage(fn.getClass().getSimpleName());
+ thrown.expectMessage(not(mentionsState()));
+ thrown.expectMessage(mentionsTimers());
+ DoFnSignature sig = DoFnSignatures.INSTANCE.getSignature(fn.getClass());
+ }
+
+ @Test
+ public void testUsageOfTimerDeclaredInSuperclass() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("process");
+ thrown.expectMessage("declared in a different class");
+ thrown.expectMessage(DoFnDeclaringTimerAndCallback.TIMER_ID);
+ thrown.expectMessage(not(mentionsState()));
+ thrown.expectMessage(mentionsTimers());
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
- new DoFnDeclaringMyTimerId() {
- @OnTimer("my-timer-id")
- public void onTimerFoo() {}
+ new DoFnDeclaringTimerAndCallback() {
+ @ProcessElement
+ public void process(
+ ProcessContext context,
+ @TimerId(DoFnDeclaringTimerAndCallback.TIMER_ID) Timer timer) {}
+ }.getClass());
+ }
+
+ @Test
+ public void testTimerParameterDuplicate() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("duplicates");
+ thrown.expectMessage("my-id");
+ thrown.expectMessage("myProcessElement");
+ thrown.expectMessage("index 2");
+ thrown.expectMessage(not(mentionsState()));
+ DoFnSignature sig =
+ DoFnSignatures.INSTANCE.getSignature(
+ new DoFn<KV<String, Integer>, Long>() {
+ @TimerId("my-id")
+ private final TimerSpec myfield = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement
- public void foo(ProcessContext context) {}
+ public void myProcessElement(
+ ProcessContext context,
+ @TimerId("my-id") Timer one,
+ @TimerId("my-id") Timer two) {}
+
+ @OnTimer("my-id")
+ public void onWhatever() {}
}.getClass());
}
@@ -211,11 +277,13 @@ public class DoFnSignaturesTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Callback");
thrown.expectMessage("declared in a different class");
- thrown.expectMessage("my-timer-id");
+ thrown.expectMessage(DoFnWithOnlyCallback.TIMER_ID);
+ thrown.expectMessage(not(mentionsState()));
+ thrown.expectMessage(mentionsTimers());
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
- new DoFnUsingMyTimerId() {
- @TimerId("my-timer-id")
+ new DoFnWithOnlyCallback() {
+ @TimerId(DoFnWithOnlyCallback.TIMER_ID)
private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
@@ -223,31 +291,53 @@ public class DoFnSignaturesTest {
}.getClass());
}
+ @Test
+ public void testDeclAndUsageOfTimerInSuperclass() throws Exception {
+ DoFnSignature sig =
+ DoFnSignatures.INSTANCE.getSignature(new DoFnOverridingAbstractTimerUse().getClass());
+
+ assertThat(sig.timerDeclarations().size(), equalTo(1));
+ assertThat(sig.processElement().extraParameters().size(), equalTo(1));
+
+ DoFnSignature.TimerDeclaration decl =
+ sig.timerDeclarations().get(DoFnOverridingAbstractTimerUse.TIMER_ID);
+ TimerParameter timerParam = (TimerParameter) sig.processElement().extraParameters().get(0);
+
+ assertThat(
+ decl.field(),
+ equalTo(DoFnDeclaringTimerAndAbstractUse.class.getDeclaredField("myTimerSpec")));
+
+ // The method we pull out is the superclass method; this is what allows validation to remain
+ // simple. The later invokeDynamic instruction causes it to invoke the actual implementation.
+ assertThat(timerParam.referent(), equalTo(decl));
+ }
+
/**
- * In this particular test, the super class annotated both the timer and the callback,
- * and the subclass overrides an abstract method. This is allowed.
+ * In this particular test, the super class annotated both the timer and the callback, and the
+ * subclass overrides an abstract method. This is allowed.
*/
@Test
public void testOnTimerDeclaredAndUsedInSuperclass() throws Exception {
DoFnSignature sig =
- DoFnSignatures.INSTANCE.getSignature(
- new DoFnOverridingAbstractCallback().getClass());
+ DoFnSignatures.INSTANCE.getSignature(new DoFnOverridingAbstractCallback().getClass());
assertThat(sig.timerDeclarations().size(), equalTo(1));
assertThat(sig.onTimerMethods().size(), equalTo(1));
- DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("my-timer-id");
- DoFnSignature.OnTimerMethod callback = sig.onTimerMethods().get("my-timer-id");
+ DoFnSignature.TimerDeclaration decl =
+ sig.timerDeclarations().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
+ DoFnSignature.OnTimerMethod callback =
+ sig.onTimerMethods().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
assertThat(
decl.field(),
- equalTo(DoFnDeclaringMyTimerIdAndAbstractCallback.class.getDeclaredField("myTimerSpec")));
+ equalTo(DoFnDeclaringTimerAndAbstractCallback.class.getDeclaredField("myTimerSpec")));
// The method we pull out is the superclass method; this is what allows validation to remain
// simple. The later invokeDynamic instruction causes it to invoke the actual implementation.
assertThat(
callback.targetMethod(),
- equalTo(DoFnDeclaringMyTimerIdAndAbstractCallback.class.getDeclaredMethod("onMyTimer")));
+ equalTo(DoFnDeclaringTimerAndAbstractCallback.class.getDeclaredMethod("onMyTimer")));
}
@Test
@@ -255,16 +345,18 @@ public class DoFnSignaturesTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Duplicate");
thrown.expectMessage("TimerId");
- thrown.expectMessage("my-timer-id");
+ thrown.expectMessage("my-id");
thrown.expectMessage("myfield1");
thrown.expectMessage("myfield2");
+ thrown.expectMessage(not(mentionsState()));
+ thrown.expectMessage(mentionsTimers());
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
- @TimerId("my-timer-id")
+ @TimerId("my-id")
private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
- @TimerId("my-timer-id")
+ @TimerId("my-id")
private final TimerSpec myfield2 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
@@ -278,6 +370,8 @@ public class DoFnSignaturesTest {
thrown.expectMessage("Timer declarations must be final");
thrown.expectMessage("Non-final field");
thrown.expectMessage("myfield");
+ thrown.expectMessage(not(mentionsState()));
+ thrown.expectMessage(mentionsTimers());
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@@ -336,10 +430,12 @@ public class DoFnSignaturesTest {
decl.field(), equalTo(DoFnForTestSimpleTimerIdNamedDoFn.class.getDeclaredField("bizzle")));
}
+ @Test
public void testStateIdWithWrongType() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("StateId");
thrown.expectMessage("StateSpec");
+ thrown.expectMessage(not(mentionsTimers()));
DoFnSignatures.INSTANCE.getSignature(
new DoFn<String, String>() {
@StateId("foo")
@@ -355,17 +451,18 @@ public class DoFnSignaturesTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Duplicate");
thrown.expectMessage("StateId");
- thrown.expectMessage("my-state-id");
+ thrown.expectMessage("my-id");
thrown.expectMessage("myfield1");
thrown.expectMessage("myfield2");
+ thrown.expectMessage(not(mentionsTimers()));
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
- @StateId("my-state-id")
+ @StateId("my-id")
private final StateSpec<Object, ValueState<Integer>> myfield1 =
StateSpecs.value(VarIntCoder.of());
- @StateId("my-state-id")
+ @StateId("my-id")
private final StateSpec<Object, ValueState<Long>> myfield2 =
StateSpecs.value(VarLongCoder.of());
@@ -380,10 +477,11 @@ public class DoFnSignaturesTest {
thrown.expectMessage("State declarations must be final");
thrown.expectMessage("Non-final field");
thrown.expectMessage("myfield");
+ thrown.expectMessage(not(mentionsTimers()));
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
- @StateId("my-state-id")
+ @StateId("my-id")
private StateSpec<Object, ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@@ -398,6 +496,7 @@ public class DoFnSignaturesTest {
thrown.expectMessage("missing StateId annotation");
thrown.expectMessage("myProcessElement");
thrown.expectMessage("index 1");
+ thrown.expectMessage(not(mentionsTimers()));
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@@ -411,15 +510,16 @@ public class DoFnSignaturesTest {
public void testStateParameterUndeclared() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("undeclared");
- thrown.expectMessage("my-state-id");
+ thrown.expectMessage("my-id");
thrown.expectMessage("myProcessElement");
thrown.expectMessage("index 1");
+ thrown.expectMessage(not(mentionsTimers()));
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@ProcessElement
public void myProcessElement(
- ProcessContext context, @StateId("my-state-id") ValueState<Integer> undeclared) {}
+ ProcessContext context, @StateId("my-id") ValueState<Integer> undeclared) {}
}.getClass());
}
@@ -427,21 +527,22 @@ public class DoFnSignaturesTest {
public void testStateParameterDuplicate() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("duplicates");
- thrown.expectMessage("my-state-id");
+ thrown.expectMessage("my-id");
thrown.expectMessage("myProcessElement");
thrown.expectMessage("index 2");
+ thrown.expectMessage(not(mentionsTimers()));
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
- @StateId("my-state-id")
+ @StateId("my-id")
private final StateSpec<Object, ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void myProcessElement(
ProcessContext context,
- @StateId("my-state-id") ValueState<Integer> one,
- @StateId("my-state-id") ValueState<Integer> two) {}
+ @StateId("my-id") ValueState<Integer> one,
+ @StateId("my-id") ValueState<Integer> two) {}
}.getClass());
}
@@ -451,19 +552,20 @@ public class DoFnSignaturesTest {
thrown.expectMessage("WatermarkHoldState");
thrown.expectMessage("but is a reference to");
thrown.expectMessage("ValueState");
- thrown.expectMessage("my-state-id");
+ thrown.expectMessage("my-id");
thrown.expectMessage("myProcessElement");
thrown.expectMessage("index 1");
+ thrown.expectMessage(not(mentionsTimers()));
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
- @StateId("my-state-id")
+ @StateId("my-id")
private final StateSpec<Object, ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void myProcessElement(
- ProcessContext context, @StateId("my-state-id") WatermarkHoldState watermark) {}
+ ProcessContext context, @StateId("my-id") WatermarkHoldState watermark) {}
}.getClass());
}
@@ -473,19 +575,20 @@ public class DoFnSignaturesTest {
thrown.expectMessage("ValueState<java.lang.String>");
thrown.expectMessage("but is a reference to");
thrown.expectMessage("ValueState<java.lang.Integer>");
- thrown.expectMessage("my-state-id");
+ thrown.expectMessage("my-id");
thrown.expectMessage("myProcessElement");
thrown.expectMessage("index 1");
+ thrown.expectMessage(not(mentionsTimers()));
DoFnSignature sig =
DoFnSignatures.INSTANCE.getSignature(
new DoFn<KV<String, Integer>, Long>() {
- @StateId("my-state-id")
+ @StateId("my-id")
private final StateSpec<Object, ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void myProcessElement(
- ProcessContext context, @StateId("my-state-id") ValueState<String> stringState) {}
+ ProcessContext context, @StateId("my-id") ValueState<String> stringState) {}
}.getClass());
}
@@ -573,8 +676,8 @@ public class DoFnSignaturesTest {
}
/**
- * Assuming the proper parsing of declarations, testing elsewhere, this test ensures that
- * a simple reference to such a declaration is correctly resolved.
+ * Assuming the proper parsing of declarations, testing elsewhere, this test ensures that a simple
+ * reference to such a declaration is correctly resolved.
*/
@Test
public void testSimpleStateIdRefAnonymousDoFn() throws Exception {
@@ -592,19 +695,23 @@ public class DoFnSignaturesTest {
assertThat(sig.processElement().extraParameters().size(), equalTo(1));
final DoFnSignature.StateDeclaration decl = sig.stateDeclarations().get("foo");
- sig.processElement().extraParameters().get(0).match(new Parameter.Cases.WithDefault<Void>() {
- @Override
- protected Void dispatchDefault(Parameter p) {
- fail(String.format("Expected a state parameter but got %s", p));
- return null;
- }
-
- @Override
- public Void dispatch(StateParameter stateParam) {
- assertThat(stateParam.referent(), equalTo(decl));
- return null;
- }
- });
+ sig.processElement()
+ .extraParameters()
+ .get(0)
+ .match(
+ new Parameter.Cases.WithDefault<Void>() {
+ @Override
+ protected Void dispatchDefault(Parameter p) {
+ fail(String.format("Expected a state parameter but got %s", p));
+ return null;
+ }
+
+ @Override
+ public Void dispatch(StateParameter stateParam) {
+ assertThat(stateParam.referent(), equalTo(decl));
+ return null;
+ }
+ });
}
@Test
@@ -646,7 +753,7 @@ public class DoFnSignaturesTest {
}
// Test classes at the bottom of the file
- DoFn<KV<String, Integer>, Long> myDoFn = new DoFnForTestGenericStatefulDoFn<Integer>(){};
+ DoFn<KV<String, Integer>, Long> myDoFn = new DoFnForTestGenericStatefulDoFn<Integer>() {};
DoFnSignature sig = DoFnSignatures.INSTANCE.signatureForDoFn(myDoFn);
@@ -661,6 +768,14 @@ public class DoFnSignaturesTest {
Matchers.<TypeDescriptor<?>>equalTo(new TypeDescriptor<ValueState<Integer>>() {}));
}
+ private Matcher<String> mentionsTimers() {
+ return anyOf(containsString("timer"), containsString("Timer"));
+ }
+
+ private Matcher<String> mentionsState() {
+ return anyOf(containsString("state"), containsString("State"));
+ }
+
private abstract static class DoFnDeclaringState extends DoFn<KV<String, Integer>, Long> {
public static final String STATE_ID = "my-state-id";
@@ -672,6 +787,7 @@ public class DoFnSignaturesTest {
private abstract static class DoFnUsingState extends DoFn<KV<String, Integer>, Long> {
public static final String STATE_ID = "my-state-id";
+
@ProcessElement
public void process(ProcessContext context, @StateId(STATE_ID) ValueState<Integer> state) {}
}
@@ -679,6 +795,7 @@ public class DoFnSignaturesTest {
private abstract static class DoFnDeclaringStateAndAbstractUse
extends DoFn<KV<String, Integer>, Long> {
public static final String STATE_ID = "my-state-id";
+
@StateId(STATE_ID)
private final StateSpec<Object, ValueState<String>> myStateSpec =
StateSpecs.value(StringUtf8Coder.of());
@@ -696,28 +813,43 @@ public class DoFnSignaturesTest {
public void foo(ProcessContext context) {}
}
- private abstract static class DoFnUsingMyTimerId extends DoFn<KV<String, Integer>, Long> {
- @OnTimer("my-timer-id")
+ private abstract static class DoFnDeclaringTimerAndCallback
+ extends DoFn<KV<String, Integer>, Long> {
+ public static final String TIMER_ID = "my-timer-id";
+
+ @TimerId(TIMER_ID)
+ private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @OnTimer(TIMER_ID)
+ public void onTimer() {}
+ }
+
+ private abstract static class DoFnWithOnlyCallback extends DoFn<KV<String, Integer>, Long> {
+ public static final String TIMER_ID = "my-timer-id";
+
+ @OnTimer(TIMER_ID)
public void onMyTimer() {}
@ProcessElement
public void foo(ProcessContext context) {}
}
- private abstract static class DoFnDeclaringMyTimerIdAndAbstractCallback
+ private abstract static class DoFnDeclaringTimerAndAbstractCallback
extends DoFn<KV<String, Integer>, Long> {
- @TimerId("my-timer-id")
+ public static final String TIMER_ID = "my-timer-id";
+
+ @TimerId(TIMER_ID)
private final TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void foo(ProcessContext context) {}
- @OnTimer("my-timer-id")
+ @OnTimer(TIMER_ID)
public abstract void onMyTimer();
}
- private static class DoFnOverridingAbstractCallback extends
- DoFnDeclaringMyTimerIdAndAbstractCallback {
+ private static class DoFnOverridingAbstractCallback
+ extends DoFnDeclaringTimerAndAbstractCallback {
@Override
public void onMyTimer() {}
@@ -725,4 +857,27 @@ public class DoFnSignaturesTest {
@ProcessElement
public void foo(ProcessContext context) {}
}
+
+ private abstract static class DoFnDeclaringTimerAndAbstractUse
+ extends DoFn<KV<String, Integer>, Long> {
+ public static final String TIMER_ID = "my-timer-id";
+
+ @TimerId(TIMER_ID)
+ private final TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public abstract void processWithTimer(ProcessContext context, @TimerId(TIMER_ID) Timer timer);
+
+ @OnTimer(TIMER_ID)
+ public abstract void onMyTimer();
+ }
+
+ private static class DoFnOverridingAbstractTimerUse extends DoFnDeclaringTimerAndAbstractUse {
+
+ @Override
+ public void onMyTimer() {}
+
+ @Override
+ public void processWithTimer(ProcessContext context, Timer timer) {}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db66cdbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
index c276692..ce00f2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
@@ -61,6 +61,7 @@ class DoFnSignaturesTestUtils {
method.getMethod(),
TypeToken.of(Integer.class),
TypeToken.of(String.class),
+ Collections.EMPTY_MAP,
Collections.EMPTY_MAP);
}
}
[2/2] incubator-beam git commit: This closes #1136
Posted by ke...@apache.org.
This closes #1136
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b47a218
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b47a218
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b47a218
Branch: refs/heads/master
Commit: 1b47a2188a133c553c2092546993a40bb965e227
Parents: a69f888 db66cdb
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 13:23:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 13:23:12 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/reflect/DoFnSignature.java | 24 ++
.../sdk/transforms/reflect/DoFnSignatures.java | 67 ++++-
.../transforms/reflect/DoFnSignaturesTest.java | 283 ++++++++++++++-----
.../reflect/DoFnSignaturesTestUtils.java | 1 +
4 files changed, 308 insertions(+), 67 deletions(-)
----------------------------------------------------------------------