You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/07 23:32:30 UTC
[5/6] incubator-beam git commit: Add DoFnInvoker dispatch for State
and Timer parameters
Add DoFnInvoker dispatch for State and Timer parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2db8268
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2db8268
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2db8268
Branch: refs/heads/master
Commit: e2db82686008aea224ca5cf1ef1acc2831c46ceb
Parents: c052d2a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 19:18:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 7 15:25:03 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 12 +++
.../beam/runners/core/SplittableParDo.java | 12 +++
.../org/apache/beam/sdk/transforms/DoFn.java | 20 ++++
.../beam/sdk/transforms/DoFnAdapters.java | 22 ++++
.../sdk/transforms/reflect/DoFnInvokers.java | 104 +++++++++++--------
.../transforms/reflect/DoFnInvokersTest.java | 59 ++++++++++-
6 files changed, 187 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index dec9905..3abb06b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -48,11 +48,13 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -532,6 +534,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
@Override
+ public State state(String timerId) {
+ throw new UnsupportedOperationException("State parameters are not supported.");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException("Timer parameters are not supported.");
+ }
+
+ @Override
public WindowingInternals<InputT, OutputT> windowingInternals() {
return new WindowingInternals<InputT, OutputT>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 33d0ab7..d8ee1d5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -46,9 +46,11 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
@@ -432,6 +434,16 @@ public class SplittableParDo<
public TrackerT restrictionTracker() {
return tracker;
}
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException("State cannot be used with a splittable DoFn");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException("Timers cannot be used with a splittable DoFn");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 2b3962e..876dfe2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -381,6 +381,16 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* the current {@link ProcessElement} call.
*/
<RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
+
+ /**
+ * Returns the state cell for the given {@link StateId}.
+ */
+ State state(String stateId);
+
+ /**
+ * Returns the timer for the given {@link TimerId}.
+ */
+ Timer timer(String timerId);
}
/** Receives values of the given type. */
@@ -416,6 +426,16 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
return null;
}
+ @Override
+ public State state(String stateId) {
+ return null;
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ return null;
+ }
+
public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index ca724cd..420304b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -28,7 +28,9 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -343,6 +345,16 @@ public class DoFnAdapters {
public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
throw new UnsupportedOperationException("This is a non-splittable DoFn");
}
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException("State is not supported by this runner");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException("Timers are not supported by this runner");
+ }
}
/**
@@ -436,5 +448,15 @@ public class DoFnAdapters {
public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
throw new UnsupportedOperationException("This is a non-splittable DoFn");
}
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException("State is not supported by this runner");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException("Timers are not supported by this runner");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index ad2b766..b7f75ed 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -23,8 +23,6 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -52,6 +50,7 @@ import net.bytebuddy.implementation.bytecode.StackManipulation;
import net.bytebuddy.implementation.bytecode.Throw;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.constant.TextConstant;
import net.bytebuddy.implementation.bytecode.member.FieldAccess;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import net.bytebuddy.implementation.bytecode.member.MethodReturn;
@@ -77,6 +76,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Restrictio
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -503,15 +503,15 @@ public class DoFnInvokers {
}
}
- private static StackManipulation simpleExtraContextParameter(
- String methodName,
- StackManipulation pushExtraContextFactory) {
+ /**
+ * This wrapper exists to convert checked exceptions to unchecked exceptions, since if this fails
+ * the library itself is malformed.
+ */
+ private static MethodDescription getExtraContextFactoryMethodDescription(
+ String methodName, Class<?>... parameterTypes) {
try {
- return new StackManipulation.Compound(
- pushExtraContextFactory,
- MethodInvocation.invoke(
- new MethodDescription.ForLoadedMethod(
- DoFn.ExtraContextFactory.class.getMethod(methodName))));
+ return new MethodDescription.ForLoadedMethod(
+ DoFn.ExtraContextFactory.class.getMethod(methodName, parameterTypes));
} catch (Exception e) {
throw new IllegalStateException(
String.format(
@@ -521,47 +521,69 @@ public class DoFnInvokers {
}
}
+ private static StackManipulation simpleExtraContextParameter(
+ String methodName,
+ StackManipulation pushExtraContextFactory) {
+ return new StackManipulation.Compound(
+ pushExtraContextFactory,
+ MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName)));
+ }
+
private static StackManipulation getExtraContextParameter(
DoFnSignature.Parameter parameter,
final StackManipulation pushExtraContextFactory) {
- return parameter.match(new Cases<StackManipulation>() {
+ return parameter.match(
+ new Cases<StackManipulation>() {
- @Override
- public StackManipulation dispatch(BoundedWindowParameter p) {
- return simpleExtraContextParameter("window", pushExtraContextFactory);
- }
+ @Override
+ public StackManipulation dispatch(BoundedWindowParameter p) {
+ return simpleExtraContextParameter("window", pushExtraContextFactory);
+ }
- @Override
- public StackManipulation dispatch(InputProviderParameter p) {
- return simpleExtraContextParameter("inputProvider", pushExtraContextFactory);
- }
+ @Override
+ public StackManipulation dispatch(InputProviderParameter p) {
+ return simpleExtraContextParameter("inputProvider", pushExtraContextFactory);
+ }
- @Override
- public StackManipulation dispatch(OutputReceiverParameter p) {
- return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory);
- }
+ @Override
+ public StackManipulation dispatch(OutputReceiverParameter p) {
+ return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory);
+ }
- @Override
- public StackManipulation dispatch(RestrictionTrackerParameter p) {
- // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker,
- // but the @ProcessElement method expects a concrete subtype of it.
- // Insert a downcast.
- return new StackManipulation.Compound(
- simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory),
- TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType())));
- }
+ @Override
+ public StackManipulation dispatch(RestrictionTrackerParameter p) {
+ // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker,
+ // but the @ProcessElement method expects a concrete subtype of it.
+ // Insert a downcast.
+ return new StackManipulation.Compound(
+ simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory),
+ TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType())));
+ }
- @Override
- public StackManipulation dispatch(StateParameter p) {
- throw new UnsupportedOperationException("State parameters are not yet supported.");
- }
+ @Override
+ public StackManipulation dispatch(StateParameter p) {
+ return new StackManipulation.Compound(
+ // TOP = extraContextFactory.state(<id>)
+ pushExtraContextFactory,
+ new TextConstant(p.referent().id()),
+ MethodInvocation.invoke(
+ getExtraContextFactoryMethodDescription("state", String.class)),
+ TypeCasting.to(
+ new TypeDescription.ForLoadedType(p.referent().stateType().getRawType())));
+ }
- @Override
- public StackManipulation dispatch(TimerParameter p) {
- throw new UnsupportedOperationException("Timer parameters are not yet supported.");
- }
- });
+ @Override
+ public StackManipulation dispatch(TimerParameter p) {
+ return new StackManipulation.Compound(
+ // TOP = extraContextFactory.state(<id>)
+ pushExtraContextFactory,
+ new TextConstant(p.referent().id()),
+ MethodInvocation.invoke(
+ getExtraContextFactoryMethodDescription("timer", String.class)),
+ TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class)));
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index dbb7955..60f82a8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -37,16 +37,23 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction;
import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -173,6 +180,56 @@ public class DoFnInvokersTest {
verify(fn).processElement(mockContext, mockWindow);
}
+ /**
+ * Tests that the generated {@link DoFnInvoker} passes the state parameter that it
+ * should.
+ */
+ @Test
+ public void testDoFnWithState() throws Exception {
+ ValueState<Integer> mockState = mock(ValueState.class);
+ final String stateId = "my-state-id-here";
+ when(extraContextFactory.state(stateId)).thenReturn(mockState);
+
+ class MockFn extends DoFn<String, String> {
+ @StateId(stateId)
+ private final StateSpec<Object, ValueState<Integer>> spec =
+ StateSpecs.value(VarIntCoder.of());
+
+ @ProcessElement
+ public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> valueState)
+ throws Exception {}
+ }
+ MockFn fn = mock(MockFn.class);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ verify(fn).processElement(mockContext, mockState);
+ }
+
+ /**
+ * Tests that the generated {@link DoFnInvoker} passes the timer parameter that it
+ * should.
+ */
+ @Test
+ public void testDoFnWithTimer() throws Exception {
+ Timer mockTimer = mock(Timer.class);
+ final String timerId = "my-timer-id-here";
+ when(extraContextFactory.timer(timerId)).thenReturn(mockTimer);
+
+ class MockFn extends DoFn<String, String> {
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext c, @TimerId(timerId) Timer timer)
+ throws Exception {}
+
+ @OnTimer(timerId)
+ public void onTimer() {}
+ }
+ MockFn fn = mock(MockFn.class);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ verify(fn).processElement(mockContext, mockTimer);
+ }
+
@Test
public void testDoFnWithOutputReceiver() throws Exception {
class MockFn extends DoFn<String, String> {