You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 20:32:28 UTC

[1/2] incubator-beam git commit: Add analysis of Timer parameters to DoFn.ProcessElement

Repository: incubator-beam
Updated Branches:
  refs/heads/master a69f888e8 -> 1b47a2188


Add analysis of Timer parameters to DoFn.ProcessElement


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/db66cdbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/db66cdbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/db66cdbe

Branch: refs/heads/master
Commit: db66cdbee77320e7c72c43a5f8dd9e640b61e6bc
Parents: 472cf0e
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 15:02:36 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:33:19 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/reflect/DoFnSignature.java   |  24 ++
 .../sdk/transforms/reflect/DoFnSignatures.java  |  67 ++++-
 .../transforms/reflect/DoFnSignaturesTest.java  | 283 ++++++++++++++-----
 .../reflect/DoFnSignaturesTestUtils.java        |   1 +
 4 files changed, 308 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db66cdbe/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 1dc1fe3..6b9e566 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn.InputProvider;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BoundedWindowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -155,6 +156,8 @@ public abstract class DoFnSignature {
         return cases.dispatch((OutputReceiverParameter) this);
       } else if (this instanceof StateParameter) {
         return cases.dispatch((StateParameter) this);
+      } else if (this instanceof TimerParameter) {
+        return cases.dispatch((TimerParameter) this);
       } else {
         throw new IllegalStateException(
             String.format("Attempt to case match on unknown %s subclass %s",
@@ -171,6 +174,7 @@ public abstract class DoFnSignature {
       ResultT dispatch(OutputReceiverParameter p);
       ResultT dispatch(RestrictionTrackerParameter p);
       ResultT dispatch(StateParameter p);
+      ResultT dispatch(TimerParameter p);
 
       /**
        * A base class for a visitor with a default method for cases it is not interested in.
@@ -203,6 +207,11 @@ public abstract class DoFnSignature {
         public ResultT dispatch(StateParameter p) {
           return dispatchDefault(p);
         }
+
+        @Override
+        public ResultT dispatch(TimerParameter p) {
+          return dispatchDefault(p);
+        }
       }
     }
 
@@ -251,6 +260,10 @@ public abstract class DoFnSignature {
       return new AutoValue_DoFnSignature_Parameter_StateParameter(decl);
     }
 
+    public static TimerParameter timerParameter(TimerDeclaration decl) {
+      return new AutoValue_DoFnSignature_Parameter_TimerParameter(decl);
+    }
+
     /**
      * Descriptor for a {@link Parameter} of type {@link BoundedWindow}.
      *
@@ -304,6 +317,17 @@ public abstract class DoFnSignature {
       StateParameter() {}
       public abstract StateDeclaration referent();
     }
+
+    /**
+     * Descriptor for a {@link Parameter} of type {@link Timer}, with an id indicated by
+     * its {@link TimerId} annotation.
+     */
+    @AutoValue
+    public abstract static class TimerParameter extends Parameter {
+      // Package visible for AutoValue
+      TimerParameter() {}
+      public abstract TimerDeclaration referent();
+    }
   }
 
   /** Describes a {@link DoFn.ProcessElement} method. */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db66cdbe/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 038b55d..ea1203a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -43,12 +43,14 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.util.state.State;
@@ -166,8 +168,13 @@ public class DoFnSignatures {
         errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
     DoFnSignature.ProcessElementMethod processElement =
         analyzeProcessElementMethod(
-            processElementErrors, fnToken, processElementMethod, inputT, outputT,
-            stateDeclarations);
+            processElementErrors,
+            fnToken,
+            processElementMethod,
+            inputT,
+            outputT,
+            stateDeclarations,
+            timerDeclarations);
     builder.setProcessElement(processElement);
 
     if (startBundleMethod != null) {
@@ -447,7 +454,8 @@ public class DoFnSignatures {
       Method m,
       TypeToken<?> inputT,
       TypeToken<?> outputT,
-      Map<String, StateDeclaration> stateDeclarations) {
+      Map<String, StateDeclaration> stateDeclarations,
+      Map<String, TimerDeclaration> timerDeclarations) {
     errors.checkArgument(
         void.class.equals(m.getReturnType())
             || DoFn.ProcessContinuation.class.equals(m.getReturnType()),
@@ -468,6 +476,7 @@ public class DoFnSignatures {
 
     List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
     Map<String, DoFnSignature.Parameter> stateParameters = new HashMap<>();
+    Map<String, DoFnSignature.Parameter> timerParameters = new HashMap<>();
     TypeToken<?> trackerT = null;
 
     TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
@@ -505,6 +514,58 @@ public class DoFnSignatures {
             formatType(paramT),
             formatType(expectedOutputReceiverT));
         extraParameters.add(DoFnSignature.Parameter.outputReceiver());
+      } else if (Timer.class.equals(rawType)) {
+        // m.getParameters() is not available until Java 8
+        Annotation[] annotations = m.getParameterAnnotations()[i];
+        String id = null;
+        for (Annotation anno : annotations) {
+          if (anno.annotationType().equals(DoFn.TimerId.class)) {
+            id = ((DoFn.TimerId) anno).value();
+            break;
+          }
+        }
+        errors.checkArgument(
+            id != null,
+            "%s parameter of type %s at index %s missing %s annotation",
+            fnClass.getRawType().getName(),
+            params[i],
+            i,
+            DoFn.TimerId.class.getSimpleName());
+
+        errors.checkArgument(
+            !timerParameters.containsKey(id),
+            "%s parameter of type %s at index %s duplicates %s(\"%s\") on other parameter",
+            fnClass.getRawType().getName(),
+            params[i],
+            i,
+            DoFn.TimerId.class.getSimpleName(),
+            id);
+
+        TimerDeclaration timerDecl = timerDeclarations.get(id);
+        errors.checkArgument(
+            timerDecl != null,
+            "%s parameter of type %s at index %s references undeclared %s \"%s\"",
+            fnClass.getRawType().getName(),
+            params[i],
+            i,
+            TimerId.class.getSimpleName(),
+            id);
+
+        errors.checkArgument(
+            timerDecl.field().getDeclaringClass().equals(m.getDeclaringClass()),
+            "Method %s has %s parameter at index %s for timer %s"
+                + " declared in a different class %s."
+                + " Timers may be referenced only in the lexical scope where they are declared.",
+            m,
+            Timer.class.getSimpleName(),
+            i,
+            id,
+            timerDecl.field().getDeclaringClass().getName());
+
+        DoFnSignature.Parameter.TimerParameter timerParameter = Parameter.timerParameter(timerDecl);
+        timerParameters.put(id, timerParameter);
+        extraParameters.add(timerParameter);
+
       } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
         errors.checkArgument(
             !extraParameters.contains(DoFnSignature.Parameter.restrictionTracker()),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db66cdbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index ad58e80..fe88c3b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -18,7 +18,10 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.errors;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -27,10 +30,13 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
 import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.state.StateSpec;
@@ -39,6 +45,7 @@ import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -145,6 +152,7 @@ public class DoFnSignaturesTest {
     thrown.expectMessage("TimerId");
     thrown.expectMessage("TimerSpec");
     thrown.expectMessage("bizzle");
+    thrown.expectMessage(not(mentionsState()));
     DoFnSignatures.INSTANCE.getSignature(
         new DoFn<String, String>() {
           @TimerId("foo")
@@ -159,11 +167,13 @@ public class DoFnSignaturesTest {
   public void testTimerIdNoCallback() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("No callback registered");
-    thrown.expectMessage("my-timer-id");
+    thrown.expectMessage("my-id");
+    thrown.expectMessage(not(mentionsState()));
+    thrown.expectMessage(mentionsTimers());
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
-              @TimerId("my-timer-id")
+              @TimerId("my-id")
               private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
               @ProcessElement
@@ -176,13 +186,15 @@ public class DoFnSignaturesTest {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Callback");
     thrown.expectMessage("undeclared timer");
-    thrown.expectMessage("onTimerFoo");
-    thrown.expectMessage("my-timer-id");
+    thrown.expectMessage("onFoo");
+    thrown.expectMessage("my-id");
+    thrown.expectMessage(not(mentionsState()));
+    thrown.expectMessage(mentionsTimers());
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
-              @OnTimer("my-timer-id")
-              public void onTimerFoo() {}
+              @OnTimer("my-id")
+              public void onFoo() {}
 
               @ProcessElement
               public void foo(ProcessContext context) {}
@@ -191,18 +203,72 @@ public class DoFnSignaturesTest {
 
   @Test
   public void testOnTimerDeclaredInSuperclass() throws Exception {
+    class DoFnDeclaringTimerAndProcessElement extends DoFn<KV<String, Integer>, Long> {
+      public static final String TIMER_ID = "my-timer-id";
+
+      @TimerId(TIMER_ID)
+      private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+      @ProcessElement
+      public void foo(ProcessContext context) {}
+    }
+
+    DoFnDeclaringTimerAndProcessElement fn =
+        new DoFnDeclaringTimerAndProcessElement() {
+          @OnTimer(DoFnDeclaringTimerAndProcessElement.TIMER_ID)
+          public void onTimerFoo() {}
+        };
+
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Callback");
     thrown.expectMessage("declared in a different class");
-    thrown.expectMessage("my-timer-id");
+    thrown.expectMessage(DoFnDeclaringTimerAndProcessElement.TIMER_ID);
+    thrown.expectMessage(fn.getClass().getSimpleName());
+    thrown.expectMessage(not(mentionsState()));
+    thrown.expectMessage(mentionsTimers());
+    DoFnSignature sig = DoFnSignatures.INSTANCE.getSignature(fn.getClass());
+  }
+
+  @Test
+  public void testUsageOfTimerDeclaredInSuperclass() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("process");
+    thrown.expectMessage("declared in a different class");
+    thrown.expectMessage(DoFnDeclaringTimerAndCallback.TIMER_ID);
+    thrown.expectMessage(not(mentionsState()));
+    thrown.expectMessage(mentionsTimers());
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
-            new DoFnDeclaringMyTimerId() {
-              @OnTimer("my-timer-id")
-              public void onTimerFoo() {}
+            new DoFnDeclaringTimerAndCallback() {
+              @ProcessElement
+              public void process(
+                  ProcessContext context,
+                  @TimerId(DoFnDeclaringTimerAndCallback.TIMER_ID) Timer timer) {}
+            }.getClass());
+  }
+
+  @Test
+  public void testTimerParameterDuplicate() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("duplicates");
+    thrown.expectMessage("my-id");
+    thrown.expectMessage("myProcessElement");
+    thrown.expectMessage("index 2");
+    thrown.expectMessage(not(mentionsState()));
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFn<KV<String, Integer>, Long>() {
+              @TimerId("my-id")
+              private final TimerSpec myfield = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
 
               @ProcessElement
-              public void foo(ProcessContext context) {}
+              public void myProcessElement(
+                  ProcessContext context,
+                  @TimerId("my-id") Timer one,
+                  @TimerId("my-id") Timer two) {}
+
+              @OnTimer("my-id")
+              public void onWhatever() {}
             }.getClass());
   }
 
@@ -211,11 +277,13 @@ public class DoFnSignaturesTest {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Callback");
     thrown.expectMessage("declared in a different class");
-    thrown.expectMessage("my-timer-id");
+    thrown.expectMessage(DoFnWithOnlyCallback.TIMER_ID);
+    thrown.expectMessage(not(mentionsState()));
+    thrown.expectMessage(mentionsTimers());
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
-            new DoFnUsingMyTimerId() {
-              @TimerId("my-timer-id")
+            new DoFnWithOnlyCallback() {
+              @TimerId(DoFnWithOnlyCallback.TIMER_ID)
               private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
               @ProcessElement
@@ -223,31 +291,53 @@ public class DoFnSignaturesTest {
             }.getClass());
   }
 
+  @Test
+  public void testDeclAndUsageOfTimerInSuperclass() throws Exception {
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(new DoFnOverridingAbstractTimerUse().getClass());
+
+    assertThat(sig.timerDeclarations().size(), equalTo(1));
+    assertThat(sig.processElement().extraParameters().size(), equalTo(1));
+
+    DoFnSignature.TimerDeclaration decl =
+        sig.timerDeclarations().get(DoFnOverridingAbstractTimerUse.TIMER_ID);
+    TimerParameter timerParam = (TimerParameter) sig.processElement().extraParameters().get(0);
+
+    assertThat(
+        decl.field(),
+        equalTo(DoFnDeclaringTimerAndAbstractUse.class.getDeclaredField("myTimerSpec")));
+
+    // The method we pull out is the superclass method; this is what allows validation to remain
+    // simple. The later invokeDynamic instruction causes it to invoke the actual implementation.
+    assertThat(timerParam.referent(), equalTo(decl));
+  }
+
   /**
-   * In this particular test, the super class annotated both the timer and the callback,
-   * and the subclass overrides an abstract method. This is allowed.
+   * In this particular test, the super class annotated both the timer and the callback, and the
+   * subclass overrides an abstract method. This is allowed.
    */
   @Test
   public void testOnTimerDeclaredAndUsedInSuperclass() throws Exception {
     DoFnSignature sig =
-        DoFnSignatures.INSTANCE.getSignature(
-            new DoFnOverridingAbstractCallback().getClass());
+        DoFnSignatures.INSTANCE.getSignature(new DoFnOverridingAbstractCallback().getClass());
 
     assertThat(sig.timerDeclarations().size(), equalTo(1));
     assertThat(sig.onTimerMethods().size(), equalTo(1));
 
-    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("my-timer-id");
-    DoFnSignature.OnTimerMethod callback = sig.onTimerMethods().get("my-timer-id");
+    DoFnSignature.TimerDeclaration decl =
+        sig.timerDeclarations().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
+    DoFnSignature.OnTimerMethod callback =
+        sig.onTimerMethods().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
 
     assertThat(
         decl.field(),
-        equalTo(DoFnDeclaringMyTimerIdAndAbstractCallback.class.getDeclaredField("myTimerSpec")));
+        equalTo(DoFnDeclaringTimerAndAbstractCallback.class.getDeclaredField("myTimerSpec")));
 
     // The method we pull out is the superclass method; this is what allows validation to remain
     // simple. The later invokeDynamic instruction causes it to invoke the actual implementation.
     assertThat(
         callback.targetMethod(),
-        equalTo(DoFnDeclaringMyTimerIdAndAbstractCallback.class.getDeclaredMethod("onMyTimer")));
+        equalTo(DoFnDeclaringTimerAndAbstractCallback.class.getDeclaredMethod("onMyTimer")));
   }
 
   @Test
@@ -255,16 +345,18 @@ public class DoFnSignaturesTest {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Duplicate");
     thrown.expectMessage("TimerId");
-    thrown.expectMessage("my-timer-id");
+    thrown.expectMessage("my-id");
     thrown.expectMessage("myfield1");
     thrown.expectMessage("myfield2");
+    thrown.expectMessage(not(mentionsState()));
+    thrown.expectMessage(mentionsTimers());
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
-              @TimerId("my-timer-id")
+              @TimerId("my-id")
               private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
-              @TimerId("my-timer-id")
+              @TimerId("my-id")
               private final TimerSpec myfield2 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
               @ProcessElement
@@ -278,6 +370,8 @@ public class DoFnSignaturesTest {
     thrown.expectMessage("Timer declarations must be final");
     thrown.expectMessage("Non-final field");
     thrown.expectMessage("myfield");
+    thrown.expectMessage(not(mentionsState()));
+    thrown.expectMessage(mentionsTimers());
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
@@ -336,10 +430,12 @@ public class DoFnSignaturesTest {
         decl.field(), equalTo(DoFnForTestSimpleTimerIdNamedDoFn.class.getDeclaredField("bizzle")));
   }
 
+  @Test
   public void testStateIdWithWrongType() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("StateId");
     thrown.expectMessage("StateSpec");
+    thrown.expectMessage(not(mentionsTimers()));
     DoFnSignatures.INSTANCE.getSignature(
         new DoFn<String, String>() {
           @StateId("foo")
@@ -355,17 +451,18 @@ public class DoFnSignaturesTest {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Duplicate");
     thrown.expectMessage("StateId");
-    thrown.expectMessage("my-state-id");
+    thrown.expectMessage("my-id");
     thrown.expectMessage("myfield1");
     thrown.expectMessage("myfield2");
+    thrown.expectMessage(not(mentionsTimers()));
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
-              @StateId("my-state-id")
+              @StateId("my-id")
               private final StateSpec<Object, ValueState<Integer>> myfield1 =
                   StateSpecs.value(VarIntCoder.of());
 
-              @StateId("my-state-id")
+              @StateId("my-id")
               private final StateSpec<Object, ValueState<Long>> myfield2 =
                   StateSpecs.value(VarLongCoder.of());
 
@@ -380,10 +477,11 @@ public class DoFnSignaturesTest {
     thrown.expectMessage("State declarations must be final");
     thrown.expectMessage("Non-final field");
     thrown.expectMessage("myfield");
+    thrown.expectMessage(not(mentionsTimers()));
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
-              @StateId("my-state-id")
+              @StateId("my-id")
               private StateSpec<Object, ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
@@ -398,6 +496,7 @@ public class DoFnSignaturesTest {
     thrown.expectMessage("missing StateId annotation");
     thrown.expectMessage("myProcessElement");
     thrown.expectMessage("index 1");
+    thrown.expectMessage(not(mentionsTimers()));
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
@@ -411,15 +510,16 @@ public class DoFnSignaturesTest {
   public void testStateParameterUndeclared() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("undeclared");
-    thrown.expectMessage("my-state-id");
+    thrown.expectMessage("my-id");
     thrown.expectMessage("myProcessElement");
     thrown.expectMessage("index 1");
+    thrown.expectMessage(not(mentionsTimers()));
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @ProcessElement
               public void myProcessElement(
-                  ProcessContext context, @StateId("my-state-id") ValueState<Integer> undeclared) {}
+                  ProcessContext context, @StateId("my-id") ValueState<Integer> undeclared) {}
             }.getClass());
   }
 
@@ -427,21 +527,22 @@ public class DoFnSignaturesTest {
   public void testStateParameterDuplicate() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("duplicates");
-    thrown.expectMessage("my-state-id");
+    thrown.expectMessage("my-id");
     thrown.expectMessage("myProcessElement");
     thrown.expectMessage("index 2");
+    thrown.expectMessage(not(mentionsTimers()));
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
-              @StateId("my-state-id")
+              @StateId("my-id")
               private final StateSpec<Object, ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
               public void myProcessElement(
                   ProcessContext context,
-                  @StateId("my-state-id") ValueState<Integer> one,
-                  @StateId("my-state-id") ValueState<Integer> two) {}
+                  @StateId("my-id") ValueState<Integer> one,
+                  @StateId("my-id") ValueState<Integer> two) {}
             }.getClass());
   }
 
@@ -451,19 +552,20 @@ public class DoFnSignaturesTest {
     thrown.expectMessage("WatermarkHoldState");
     thrown.expectMessage("but is a reference to");
     thrown.expectMessage("ValueState");
-    thrown.expectMessage("my-state-id");
+    thrown.expectMessage("my-id");
     thrown.expectMessage("myProcessElement");
     thrown.expectMessage("index 1");
+    thrown.expectMessage(not(mentionsTimers()));
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
-              @StateId("my-state-id")
+              @StateId("my-id")
               private final StateSpec<Object, ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
               public void myProcessElement(
-                  ProcessContext context, @StateId("my-state-id") WatermarkHoldState watermark) {}
+                  ProcessContext context, @StateId("my-id") WatermarkHoldState watermark) {}
             }.getClass());
   }
 
@@ -473,19 +575,20 @@ public class DoFnSignaturesTest {
     thrown.expectMessage("ValueState<java.lang.String>");
     thrown.expectMessage("but is a reference to");
     thrown.expectMessage("ValueState<java.lang.Integer>");
-    thrown.expectMessage("my-state-id");
+    thrown.expectMessage("my-id");
     thrown.expectMessage("myProcessElement");
     thrown.expectMessage("index 1");
+    thrown.expectMessage(not(mentionsTimers()));
     DoFnSignature sig =
         DoFnSignatures.INSTANCE.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
-              @StateId("my-state-id")
+              @StateId("my-id")
               private final StateSpec<Object, ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
               public void myProcessElement(
-                  ProcessContext context, @StateId("my-state-id") ValueState<String> stringState) {}
+                  ProcessContext context, @StateId("my-id") ValueState<String> stringState) {}
             }.getClass());
   }
 
@@ -573,8 +676,8 @@ public class DoFnSignaturesTest {
   }
 
   /**
-   * Assuming the proper parsing of declarations, testing elsewhere, this test ensures that
-   * a simple reference to such a declaration is correctly resolved.
+   * Assuming the proper parsing of declarations, testing elsewhere, this test ensures that a simple
+   * reference to such a declaration is correctly resolved.
    */
   @Test
   public void testSimpleStateIdRefAnonymousDoFn() throws Exception {
@@ -592,19 +695,23 @@ public class DoFnSignaturesTest {
     assertThat(sig.processElement().extraParameters().size(), equalTo(1));
 
     final DoFnSignature.StateDeclaration decl = sig.stateDeclarations().get("foo");
-    sig.processElement().extraParameters().get(0).match(new Parameter.Cases.WithDefault<Void>() {
-      @Override
-      protected Void dispatchDefault(Parameter p) {
-        fail(String.format("Expected a state parameter but got %s", p));
-        return null;
-      }
-
-      @Override
-      public Void dispatch(StateParameter stateParam) {
-        assertThat(stateParam.referent(), equalTo(decl));
-        return null;
-      }
-    });
+    sig.processElement()
+        .extraParameters()
+        .get(0)
+        .match(
+            new Parameter.Cases.WithDefault<Void>() {
+              @Override
+              protected Void dispatchDefault(Parameter p) {
+                fail(String.format("Expected a state parameter but got %s", p));
+                return null;
+              }
+
+              @Override
+              public Void dispatch(StateParameter stateParam) {
+                assertThat(stateParam.referent(), equalTo(decl));
+                return null;
+              }
+            });
   }
 
   @Test
@@ -646,7 +753,7 @@ public class DoFnSignaturesTest {
     }
 
     // Test classes at the bottom of the file
-    DoFn<KV<String, Integer>, Long> myDoFn = new DoFnForTestGenericStatefulDoFn<Integer>(){};
+    DoFn<KV<String, Integer>, Long> myDoFn = new DoFnForTestGenericStatefulDoFn<Integer>() {};
 
     DoFnSignature sig = DoFnSignatures.INSTANCE.signatureForDoFn(myDoFn);
 
@@ -661,6 +768,14 @@ public class DoFnSignaturesTest {
         Matchers.<TypeDescriptor<?>>equalTo(new TypeDescriptor<ValueState<Integer>>() {}));
   }
 
+  private Matcher<String> mentionsTimers() {
+    return anyOf(containsString("timer"), containsString("Timer"));
+  }
+
+  private Matcher<String> mentionsState() {
+    return anyOf(containsString("state"), containsString("State"));
+  }
+
   private abstract static class DoFnDeclaringState extends DoFn<KV<String, Integer>, Long> {
 
     public static final String STATE_ID = "my-state-id";
@@ -672,6 +787,7 @@ public class DoFnSignaturesTest {
 
   private abstract static class DoFnUsingState extends DoFn<KV<String, Integer>, Long> {
     public static final String STATE_ID = "my-state-id";
+
     @ProcessElement
     public void process(ProcessContext context, @StateId(STATE_ID) ValueState<Integer> state) {}
   }
@@ -679,6 +795,7 @@ public class DoFnSignaturesTest {
   private abstract static class DoFnDeclaringStateAndAbstractUse
       extends DoFn<KV<String, Integer>, Long> {
     public static final String STATE_ID = "my-state-id";
+
     @StateId(STATE_ID)
     private final StateSpec<Object, ValueState<String>> myStateSpec =
         StateSpecs.value(StringUtf8Coder.of());
@@ -696,28 +813,43 @@ public class DoFnSignaturesTest {
     public void foo(ProcessContext context) {}
   }
 
-  private abstract static class DoFnUsingMyTimerId extends DoFn<KV<String, Integer>, Long> {
-    @OnTimer("my-timer-id")
+  private abstract static class DoFnDeclaringTimerAndCallback
+      extends DoFn<KV<String, Integer>, Long> {
+    public static final String TIMER_ID = "my-timer-id";
+
+    @TimerId(TIMER_ID)
+    private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @OnTimer(TIMER_ID)
+    public void onTimer() {}
+  }
+
+  private abstract static class DoFnWithOnlyCallback extends DoFn<KV<String, Integer>, Long> {
+    public static final String TIMER_ID = "my-timer-id";
+
+    @OnTimer(TIMER_ID)
     public void onMyTimer() {}
 
     @ProcessElement
     public void foo(ProcessContext context) {}
   }
 
-  private abstract static class DoFnDeclaringMyTimerIdAndAbstractCallback
+  private abstract static class DoFnDeclaringTimerAndAbstractCallback
       extends DoFn<KV<String, Integer>, Long> {
-    @TimerId("my-timer-id")
+    public static final String TIMER_ID = "my-timer-id";
+
+    @TimerId(TIMER_ID)
     private final TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
     @ProcessElement
     public void foo(ProcessContext context) {}
 
-    @OnTimer("my-timer-id")
+    @OnTimer(TIMER_ID)
     public abstract void onMyTimer();
   }
 
-  private static class DoFnOverridingAbstractCallback extends
-      DoFnDeclaringMyTimerIdAndAbstractCallback {
+  private static class DoFnOverridingAbstractCallback
+      extends DoFnDeclaringTimerAndAbstractCallback {
 
     @Override
     public void onMyTimer() {}
@@ -725,4 +857,27 @@ public class DoFnSignaturesTest {
     @ProcessElement
     public void foo(ProcessContext context) {}
   }
+
+  private abstract static class DoFnDeclaringTimerAndAbstractUse
+      extends DoFn<KV<String, Integer>, Long> {
+    public static final String TIMER_ID = "my-timer-id";
+
+    @TimerId(TIMER_ID)
+    private final TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public abstract void processWithTimer(ProcessContext context, @TimerId(TIMER_ID) Timer timer);
+
+    @OnTimer(TIMER_ID)
+    public abstract void onMyTimer();
+  }
+
+  private static class DoFnOverridingAbstractTimerUse extends DoFnDeclaringTimerAndAbstractUse {
+
+    @Override
+    public void onMyTimer() {}
+
+    @Override
+    public void processWithTimer(ProcessContext context, Timer timer) {}
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db66cdbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
index c276692..ce00f2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
@@ -61,6 +61,7 @@ class DoFnSignaturesTestUtils {
         method.getMethod(),
         TypeToken.of(Integer.class),
         TypeToken.of(String.class),
+        Collections.EMPTY_MAP,
         Collections.EMPTY_MAP);
   }
 }


[2/2] incubator-beam git commit: This closes #1136

Posted by ke...@apache.org.
This closes #1136


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b47a218
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b47a218
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b47a218

Branch: refs/heads/master
Commit: 1b47a2188a133c553c2092546993a40bb965e227
Parents: a69f888 db66cdb
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 13:23:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 13:23:12 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/reflect/DoFnSignature.java   |  24 ++
 .../sdk/transforms/reflect/DoFnSignatures.java  |  67 ++++-
 .../transforms/reflect/DoFnSignaturesTest.java  | 283 ++++++++++++++-----
 .../reflect/DoFnSignaturesTestUtils.java        |   1 +
 4 files changed, 308 insertions(+), 67 deletions(-)
----------------------------------------------------------------------