You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 03:30:12 UTC

[2/5] incubator-beam git commit: Add OnTimerContext parameter support to DoFnSignature

Add OnTimerContext parameter support to DoFnSignature


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42b506f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42b506f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42b506f0

Branch: refs/heads/master
Commit: 42b506f06dbd73e03a2cfad4e7677e9698b3c020
Parents: 3f8c807
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:18:18 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 7 19:22:43 2016 -0800

----------------------------------------------------------------------
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  6 ++
 .../sdk/transforms/reflect/DoFnSignature.java   | 26 +++++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 90 ++++++++++++++++----
 .../DoFnSignaturesSplittableDoFnTest.java       |  3 +-
 .../transforms/reflect/DoFnSignaturesTest.java  | 47 ++++++++++
 5 files changed, 154 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 8750d64..3480603 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
@@ -554,6 +555,11 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
           }
 
           @Override
+          public StackManipulation dispatch(OnTimerContextParameter p) {
+            throw new UnsupportedOperationException("OnTimerContext is not yet supported.");
+          }
+
+          @Override
           public StackManipulation dispatch(WindowParameter p) {
             return new StackManipulation.Compound(
                 simpleExtraContextParameter(WINDOW_PARAMETER_METHOD),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 0750949..ccc9ac3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -175,6 +175,8 @@ public abstract class DoFnSignature {
         return cases.dispatch((ContextParameter) this);
       } else if (this instanceof ProcessContextParameter) {
         return cases.dispatch((ProcessContextParameter) this);
+      } else if (this instanceof OnTimerContextParameter) {
+        return cases.dispatch((OnTimerContextParameter) this);
       } else if (this instanceof WindowParameter) {
         return cases.dispatch((WindowParameter) this);
       } else if (this instanceof RestrictionTrackerParameter) {
@@ -200,6 +202,7 @@ public abstract class DoFnSignature {
     public interface Cases<ResultT> {
       ResultT dispatch(ContextParameter p);
       ResultT dispatch(ProcessContextParameter p);
+      ResultT dispatch(OnTimerContextParameter p);
       ResultT dispatch(WindowParameter p);
       ResultT dispatch(InputProviderParameter p);
       ResultT dispatch(OutputReceiverParameter p);
@@ -225,6 +228,11 @@ public abstract class DoFnSignature {
         }
 
         @Override
+        public ResultT dispatch(OnTimerContextParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
         public ResultT dispatch(WindowParameter p) {
           return dispatchDefault(p);
         }
@@ -261,12 +269,14 @@ public abstract class DoFnSignature {
         new AutoValue_DoFnSignature_Parameter_ContextParameter();
     private static final ProcessContextParameter PROCESS_CONTEXT_PARAMETER =
           new AutoValue_DoFnSignature_Parameter_ProcessContextParameter();
+    private static final OnTimerContextParameter ON_TIMER_CONTEXT_PARAMETER =
+        new AutoValue_DoFnSignature_Parameter_OnTimerContextParameter();
     private static final InputProviderParameter INPUT_PROVIDER_PARAMETER =
         new AutoValue_DoFnSignature_Parameter_InputProviderParameter();
     private static final OutputReceiverParameter OUTPUT_RECEIVER_PARAMETER =
         new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter();
 
-    /** Returns a {@link ProcessContextParameter}. */
+    /** Returns a {@link ContextParameter}. */
     public static ContextParameter context() {
       return CONTEXT_PARAMETER;
     }
@@ -276,6 +286,11 @@ public abstract class DoFnSignature {
       return PROCESS_CONTEXT_PARAMETER;
     }
 
+    /** Returns a {@link OnTimerContextParameter}. */
+    public static OnTimerContextParameter onTimerContext() {
+      return ON_TIMER_CONTEXT_PARAMETER;
+    }
+
     /** Returns a {@link WindowParameter}. */
     public static WindowParameter boundedWindow(TypeDescriptor<? extends BoundedWindow> windowT) {
       return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT);
@@ -334,6 +349,15 @@ public abstract class DoFnSignature {
     }
 
     /**
+     * Descriptor for a {@link Parameter} of type {@link DoFn.OnTimerContext}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class OnTimerContextParameter extends Parameter {
+      OnTimerContextParameter() {}
+    }
+    /**
      * Descriptor for a {@link Parameter} of type {@link BoundedWindow}.
      *
      * <p>All such descriptors are equal.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 83d67b7..e3ba966 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
@@ -74,6 +74,29 @@ public class DoFnSignatures {
 
   private static final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
 
+  private static final Collection<Class<? extends Parameter>>
+      ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
+      ImmutableList.of(
+          Parameter.ProcessContextParameter.class,
+          Parameter.WindowParameter.class,
+          Parameter.TimerParameter.class,
+          Parameter.StateParameter.class,
+          Parameter.InputProviderParameter.class,
+          Parameter.OutputReceiverParameter.class);
+
+  private static final Collection<Class<? extends Parameter>>
+      ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
+          ImmutableList.of(
+              Parameter.ProcessContextParameter.class, Parameter.RestrictionTrackerParameter.class);
+
+  private static final Collection<Class<? extends Parameter>>
+      ALLOWED_ON_TIMER_PARAMETERS =
+          ImmutableList.of(
+              Parameter.OnTimerContextParameter.class,
+              Parameter.WindowParameter.class,
+              Parameter.TimerParameter.class,
+              Parameter.StateParameter.class);
+
   /** @return the {@link DoFnSignature} for the given {@link DoFn} instance. */
   public static <FnT extends DoFn<?, ?>> DoFnSignature signatureForDoFn(FnT fn) {
     return getSignature(fn.getClass());
@@ -583,6 +606,18 @@ public class DoFnSignatures {
   }
 
   /**
+   * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.Context} given {@code
+   * InputT} and {@code OutputT}.
+   */
+  private static <InputT, OutputT>
+      TypeDescriptor<DoFn<InputT, OutputT>.OnTimerContext> doFnOnTimerContextTypeOf(
+          TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) {
+    return new TypeDescriptor<DoFn<InputT, OutputT>.OnTimerContext>() {}.where(
+            new TypeParameter<InputT>() {}, inputT)
+        .where(new TypeParameter<OutputT>() {}, outputT);
+  }
+
+  /**
    * Generates a {@link TypeDescriptor} for {@code DoFn.InputProvider<InputT>} given {@code InputT}.
    */
   private static <InputT> TypeDescriptor<DoFn.InputProvider<InputT>> inputProviderTypeOf(
@@ -621,7 +656,7 @@ public class DoFnSignatures {
     List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
     ErrorReporter onTimerErrors = errors.forMethod(DoFn.OnTimer.class, m);
     for (int i = 0; i < params.length; ++i) {
-      extraParameters.add(
+      Parameter parameter =
           analyzeExtraParameter(
               onTimerErrors,
               fnContext,
@@ -633,7 +668,14 @@ public class DoFnSignatures {
                   fnClass.resolveType(params[i]),
                   Arrays.asList(m.getParameterAnnotations()[i])),
               inputT,
-              outputT));
+              outputT);
+
+      checkParameterOneOf(
+          errors,
+          parameter,
+          ALLOWED_ON_TIMER_PARAMETERS);
+
+      extraParameters.add(parameter);
     }
 
     return DoFnSignature.OnTimerMethod.create(m, timerId, windowT, extraParameters);
@@ -679,20 +721,15 @@ public class DoFnSignatures {
       methodContext.addParameter(extraParam);
     }
 
-    // A splittable DoFn can not have any other extra context parameters.
+    // The allowed parameters depend on whether this DoFn is splittable
     if (methodContext.hasRestrictionTrackerParameter()) {
-      errors.checkArgument(
-          Iterables.all(
-              methodContext.getExtraParameters(),
-              Predicates.or(
-                  Predicates.instanceOf(RestrictionTrackerParameter.class),
-                  Predicates.instanceOf(ProcessContextParameter.class))),
-          "Splittable %s @%s must have only %s and %s parameters, but has: %s",
-          DoFn.class.getSimpleName(),
-          DoFn.ProcessElement.class.getSimpleName(),
-          DoFn.ProcessContext.class.getSimpleName(),
-          RestrictionTracker.class.getSimpleName(),
-          methodContext.getExtraParameters());
+      for (Parameter parameter : methodContext.getExtraParameters()) {
+        checkParameterOneOf(errors, parameter, ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS);
+      }
+    } else {
+      for (Parameter parameter : methodContext.getExtraParameters()) {
+        checkParameterOneOf(errors, parameter, ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS);
+      }
     }
 
     return DoFnSignature.ProcessElementMethod.create(
@@ -703,6 +740,21 @@ public class DoFnSignatures {
         DoFn.ProcessContinuation.class.equals(m.getReturnType()));
   }
 
+  private static void checkParameterOneOf(
+      ErrorReporter errors,
+      Parameter parameter,
+      Collection<Class<? extends Parameter>> allowedParameterClasses) {
+
+    for (Class<? extends Parameter> paramClass : allowedParameterClasses) {
+      if (paramClass.isAssignableFrom(parameter.getClass())) {
+        return;
+      }
+    }
+
+    // If we get here, none matched
+    errors.throwIllegalArgument("Illegal parameter type: %s", parameter);
+  }
+
   private static Parameter analyzeExtraParameter(
       ErrorReporter methodErrors,
       FnAnalysisContext fnContext,
@@ -714,6 +766,7 @@ public class DoFnSignatures {
 
     TypeDescriptor<?> expectedProcessContextT = doFnProcessContextTypeOf(inputT, outputT);
     TypeDescriptor<?> expectedContextT = doFnContextTypeOf(inputT, outputT);
+    TypeDescriptor<?> expectedOnTimerContextT = doFnOnTimerContextTypeOf(inputT, outputT);
     TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT);
     TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
 
@@ -732,6 +785,11 @@ public class DoFnSignatures {
           "Must take %s as the Context argument",
           formatType(expectedContextT));
       return Parameter.context();
+    } else if (rawType.equals(DoFn.OnTimerContext.class)) {
+        methodErrors.checkArgument(paramT.equals(expectedOnTimerContextT),
+            "Must take %s as the OnTimerContext argument",
+            formatType(expectedOnTimerContextT));
+        return Parameter.onTimerContext();
     } else if (BoundedWindow.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
           !methodContext.hasWindowParameter(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 91f2d1b..7b594c9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -90,7 +90,8 @@ public class DoFnSignaturesSplittableDoFnTest {
   @Test
   public void testSplittableProcessElementMustNotHaveOtherParams() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("must have only ProcessContext and RestrictionTracker parameters");
+    thrown.expectMessage("Illegal parameter");
+    thrown.expectMessage("BoundedWindow");
 
     DoFnSignature.ProcessElementMethod signature =
         analyzeProcessElementMethod(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 1381cd9..69d4058 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -442,6 +442,53 @@ public class DoFnSignaturesTest {
   }
 
   @Test
+  public void testSimpleTimerWithContext() throws Exception {
+    DoFnSignature sig =
+        DoFnSignatures.getSignature(
+            new DoFn<KV<String, Integer>, Long>() {
+              @TimerId("foo")
+              private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @ProcessElement
+              public void foo(ProcessContext context) {}
+
+              @OnTimer("foo")
+              public void onFoo(OnTimerContext c) {}
+            }.getClass());
+
+    assertThat(sig.timerDeclarations().size(), equalTo(1));
+    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+
+    assertThat(decl.id(), equalTo("foo"));
+    assertThat(decl.field().getName(), equalTo("bizzle"));
+
+    assertThat(
+        sig.onTimerMethods().get("foo").extraParameters().get(0),
+        equalTo((Parameter) Parameter.onTimerContext()));
+  }
+
+  @Test
+  public void testProcessElementWithOnTimerContextRejected() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    // The message should at least mention @ProcessElement and OnTimerContext
+    thrown.expectMessage("@" + DoFn.ProcessElement.class.getSimpleName());
+    thrown.expectMessage(DoFn.OnTimerContext.class.getSimpleName());
+
+    DoFnSignature sig =
+        DoFnSignatures.getSignature(
+            new DoFn<KV<String, Integer>, Long>() {
+              @TimerId("foo")
+              private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @ProcessElement
+              public void foo(ProcessContext context, OnTimerContext bogus) {}
+
+              @OnTimer("foo")
+              public void onFoo() {}
+            }.getClass());
+  }
+
+  @Test
   public void testSimpleTimerIdNamedDoFn() throws Exception {
     class DoFnForTestSimpleTimerIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
       @TimerId("foo")