You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/08 03:41:35 UTC

[36/50] incubator-beam git commit: Add DoFnInvoker dispatch for State and Timer parameters

Add DoFnInvoker dispatch for State and Timer parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2db8268
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2db8268
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2db8268

Branch: refs/heads/gearpump-runner
Commit: e2db82686008aea224ca5cf1ef1acc2831c46ceb
Parents: c052d2a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 19:18:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 7 15:25:03 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |  12 +++
 .../beam/runners/core/SplittableParDo.java      |  12 +++
 .../org/apache/beam/sdk/transforms/DoFn.java    |  20 ++++
 .../beam/sdk/transforms/DoFnAdapters.java       |  22 ++++
 .../sdk/transforms/reflect/DoFnInvokers.java    | 104 +++++++++++--------
 .../transforms/reflect/DoFnInvokersTest.java    |  59 ++++++++++-
 6 files changed, 187 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index dec9905..3abb06b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -48,11 +48,13 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -532,6 +534,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
+    public State state(String timerId) {
+      throw new UnsupportedOperationException("State parameters are not supported.");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timer parameters are not supported.");
+    }
+
+    @Override
     public WindowingInternals<InputT, OutputT> windowingInternals() {
       return new WindowingInternals<InputT, OutputT>() {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 33d0ab7..d8ee1d5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -46,9 +46,11 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;
@@ -432,6 +434,16 @@ public class SplittableParDo<
       public TrackerT restrictionTracker() {
         return tracker;
       }
+
+      @Override
+      public State state(String stateId) {
+        throw new UnsupportedOperationException("State cannot be used with a splittable DoFn");
+      }
+
+      @Override
+      public Timer timer(String timerId) {
+        throw new UnsupportedOperationException("Timers cannot be used with a splittable DoFn");
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 2b3962e..876dfe2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -381,6 +381,16 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      * the current {@link ProcessElement} call.
      */
     <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
+
+    /**
+     * Returns the state cell for the given {@link StateId}.
+     */
+    State state(String stateId);
+
+    /**
+     * Returns the timer for the given {@link TimerId}.
+     */
+    Timer timer(String timerId);
   }
 
   /** Receives values of the given type. */
@@ -416,6 +426,16 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
       return null;
     }
 
+    @Override
+    public State state(String stateId) {
+      return null;
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      return null;
+    }
+
     public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index ca724cd..420304b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -28,7 +28,9 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -343,6 +345,16 @@ public class DoFnAdapters {
     public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
       throw new UnsupportedOperationException("This is a non-splittable DoFn");
     }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by this runner");
+    }
   }
 
   /**
@@ -436,5 +448,15 @@ public class DoFnAdapters {
     public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
       throw new UnsupportedOperationException("This is a non-splittable DoFn");
     }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by this runner");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index ad2b766..b7f75ed 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -23,8 +23,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +50,7 @@ import net.bytebuddy.implementation.bytecode.StackManipulation;
 import net.bytebuddy.implementation.bytecode.Throw;
 import net.bytebuddy.implementation.bytecode.assign.Assigner;
 import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.constant.TextConstant;
 import net.bytebuddy.implementation.bytecode.member.FieldAccess;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
 import net.bytebuddy.implementation.bytecode.member.MethodReturn;
@@ -77,6 +76,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Restrictio
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -503,15 +503,15 @@ public class DoFnInvokers {
     }
   }
 
-  private static StackManipulation simpleExtraContextParameter(
-    String methodName,
-    StackManipulation pushExtraContextFactory) {
+  /**
+   * This wrapper exists to convert checked exceptions to unchecked exceptions, since if this fails
+   * the library itself is malformed.
+   */
+  private static MethodDescription getExtraContextFactoryMethodDescription(
+      String methodName, Class<?>... parameterTypes) {
     try {
-      return new StackManipulation.Compound(
-        pushExtraContextFactory,
-        MethodInvocation.invoke(
-            new MethodDescription.ForLoadedMethod(
-                DoFn.ExtraContextFactory.class.getMethod(methodName))));
+    return new MethodDescription.ForLoadedMethod(
+                DoFn.ExtraContextFactory.class.getMethod(methodName, parameterTypes));
     } catch (Exception e) {
       throw new IllegalStateException(
           String.format(
@@ -521,47 +521,69 @@ public class DoFnInvokers {
     }
   }
 
+  private static StackManipulation simpleExtraContextParameter(
+    String methodName,
+    StackManipulation pushExtraContextFactory) {
+      return new StackManipulation.Compound(
+        pushExtraContextFactory,
+        MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName)));
+  }
+
   private static StackManipulation getExtraContextParameter(
       DoFnSignature.Parameter parameter,
       final StackManipulation pushExtraContextFactory) {
 
-    return parameter.match(new Cases<StackManipulation>() {
+    return parameter.match(
+        new Cases<StackManipulation>() {
 
-      @Override
-      public StackManipulation dispatch(BoundedWindowParameter p) {
-        return simpleExtraContextParameter("window", pushExtraContextFactory);
-      }
+          @Override
+          public StackManipulation dispatch(BoundedWindowParameter p) {
+            return simpleExtraContextParameter("window", pushExtraContextFactory);
+          }
 
-      @Override
-      public StackManipulation dispatch(InputProviderParameter p) {
-        return simpleExtraContextParameter("inputProvider", pushExtraContextFactory);
-      }
+          @Override
+          public StackManipulation dispatch(InputProviderParameter p) {
+            return simpleExtraContextParameter("inputProvider", pushExtraContextFactory);
+          }
 
-      @Override
-      public StackManipulation dispatch(OutputReceiverParameter p) {
-        return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory);
-      }
+          @Override
+          public StackManipulation dispatch(OutputReceiverParameter p) {
+            return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory);
+          }
 
-      @Override
-      public StackManipulation dispatch(RestrictionTrackerParameter p) {
-        // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker,
-        // but the @ProcessElement method expects a concrete subtype of it.
-        // Insert a downcast.
-        return new StackManipulation.Compound(
-            simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory),
-            TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType())));
-      }
+          @Override
+          public StackManipulation dispatch(RestrictionTrackerParameter p) {
+            // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker,
+            // but the @ProcessElement method expects a concrete subtype of it.
+            // Insert a downcast.
+            return new StackManipulation.Compound(
+                simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory),
+                TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType())));
+          }
 
-      @Override
-      public StackManipulation dispatch(StateParameter p) {
-        throw new UnsupportedOperationException("State parameters are not yet supported.");
-      }
+          @Override
+          public StackManipulation dispatch(StateParameter p) {
+            return new StackManipulation.Compound(
+                // TOP = extraContextFactory.state(<id>)
+                pushExtraContextFactory,
+                new TextConstant(p.referent().id()),
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription("state", String.class)),
+                TypeCasting.to(
+                    new TypeDescription.ForLoadedType(p.referent().stateType().getRawType())));
+          }
 
-      @Override
-      public StackManipulation dispatch(TimerParameter p) {
-        throw new UnsupportedOperationException("Timer parameters are not yet supported.");
-      }
-    });
+          @Override
+          public StackManipulation dispatch(TimerParameter p) {
+            return new StackManipulation.Compound(
+                // TOP = extraContextFactory.state(<id>)
+                pushExtraContextFactory,
+                new TextConstant(p.referent().id()),
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription("timer", String.class)),
+                TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class)));
+          }
+        });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index dbb7955..60f82a8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -37,16 +37,23 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction;
 import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -173,6 +180,56 @@ public class DoFnInvokersTest {
     verify(fn).processElement(mockContext, mockWindow);
   }
 
+  /**
+   * Tests that the generated {@link DoFnInvoker} passes the state parameter that it
+   * should.
+   */
+  @Test
+  public void testDoFnWithState() throws Exception {
+    ValueState<Integer> mockState = mock(ValueState.class);
+    final String stateId = "my-state-id-here";
+    when(extraContextFactory.state(stateId)).thenReturn(mockState);
+
+    class MockFn extends DoFn<String, String> {
+      @StateId(stateId)
+      private final StateSpec<Object, ValueState<Integer>> spec =
+          StateSpecs.value(VarIntCoder.of());
+
+      @ProcessElement
+      public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> valueState)
+          throws Exception {}
+    }
+    MockFn fn = mock(MockFn.class);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    verify(fn).processElement(mockContext, mockState);
+  }
+
+  /**
+   * Tests that the generated {@link DoFnInvoker} passes the timer parameter that it
+   * should.
+   */
+  @Test
+  public void testDoFnWithTimer() throws Exception {
+    Timer mockTimer = mock(Timer.class);
+    final String timerId = "my-timer-id-here";
+    when(extraContextFactory.timer(timerId)).thenReturn(mockTimer);
+
+    class MockFn extends DoFn<String, String> {
+      @TimerId(timerId)
+      private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+      @ProcessElement
+      public void processElement(ProcessContext c, @TimerId(timerId) Timer timer)
+          throws Exception {}
+
+      @OnTimer(timerId)
+      public void onTimer() {}
+    }
+    MockFn fn = mock(MockFn.class);
+    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    verify(fn).processElement(mockContext, mockTimer);
+  }
+
   @Test
   public void testDoFnWithOutputReceiver() throws Exception {
     class MockFn extends DoFn<String, String> {