You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 20:11:27 UTC
[23/50] incubator-beam git commit: Move DoFn.ArgumentProvider to
DoFnInvoker.ArgumentProvider
Move DoFn.ArgumentProvider to DoFnInvoker.ArgumentProvider
The arguments provided as a single object are an aspect of the
DoFnInvoker, not the DoFn. The DoFn itself is a specification
that may have other ways of being invoked, depending on the
circumstance.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33fb8c2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33fb8c2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33fb8c2d
Branch: refs/heads/python-sdk
Commit: 33fb8c2db8c64275f1b9b8ac6dfd12e92d7fb777
Parents: bb9c386
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 17 23:04:55 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 14:20:20 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 4 +-
.../beam/runners/core/SplittableParDo.java | 7 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 122 -------------------
.../beam/sdk/transforms/DoFnAdapters.java | 10 +-
.../reflect/ByteBuddyDoFnInvokerFactory.java | 41 +++----
.../sdk/transforms/reflect/DoFnInvoker.java | 121 +++++++++++++++++-
.../sdk/transforms/reflect/DoFnInvokers.java | 4 +-
.../sdk/transforms/reflect/OnTimerInvoker.java | 8 +-
.../transforms/reflect/DoFnInvokersTest.java | 5 +-
.../transforms/reflect/OnTimerInvokersTest.java | 2 +-
.../transforms/DoFnInvokersBenchmark.java | 5 +-
11 files changed, 161 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 76aae8f..841e412 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -183,7 +183,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
- implements DoFn.ArgumentProvider<InputT, OutputT> {
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
private static final int MAX_SIDE_OUTPUTS = 1000;
final PipelineOptions options;
@@ -424,7 +424,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
private class DoFnProcessContext<InputT, OutputT> extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFn.ArgumentProvider<InputT, OutputT> {
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
final DoFn<InputT, OutputT> fn;
final DoFnContext<InputT, OutputT> context;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 3003984..c38ab2f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -392,10 +392,11 @@ public class SplittableParDo<
}
/**
- * Creates an {@link DoFn.ArgumentProvider} that provides the given tracker as well as the given
+ * Creates an {@link DoFnInvoker.ArgumentProvider} that provides the given tracker as well as
+ * the given
* {@link ProcessContext} (which is also provided when a {@link Context} is requested.
*/
- private DoFn.ArgumentProvider<InputT, OutputT> wrapTracker(
+ private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapTracker(
TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) {
return new ArgumentProviderForTracker<>(tracker, processContext);
@@ -403,7 +404,7 @@ public class SplittableParDo<
private static class ArgumentProviderForTracker<
InputT, OutputT, TrackerT extends RestrictionTracker<?>>
- implements DoFn.ArgumentProvider<InputT, OutputT> {
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
private final TrackerT tracker;
private final DoFn<InputT, OutputT>.ProcessContext processContext;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index bf0631b..9978ef4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -38,13 +38,11 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollection;
@@ -331,78 +329,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
return new TypeDescriptor<OutputT>(getClass()) {};
}
- /**
- * Interface for runner implementors to provide implementations of extra context information.
- *
- * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking an
- * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
- * has indicated it needs the given extra context.
- *
- * <p>In the case of {@link ProcessElement} it is called once per invocation of
- * {@link ProcessElement}.
- */
- public interface ArgumentProvider<InputT, OutputT> {
- /**
- * Construct the {@link BoundedWindow} to use within a {@link DoFn} that
- * needs it. This is called if the {@link ProcessElement} method has a parameter of type
- * {@link BoundedWindow}.
- *
- * @return {@link BoundedWindow} of the element currently being processed.
- */
- BoundedWindow window();
-
- /**
- * Provide a {@link DoFn.Context} to use with the given {@link DoFn}.
- */
- DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn);
-
- /**
- * Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}.
- */
- DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn);
-
- /**
- * A placeholder for testing purposes.
- */
- InputProvider<InputT> inputProvider();
-
- /**
- * A placeholder for testing purposes.
- */
- OutputReceiver<OutputT> outputReceiver();
-
- /**
- * For migration from {@link OldDoFn} to {@link DoFn}, provide
- * a {@link WindowingInternals} so an {@link OldDoFn} can be run
- * via {@link DoFnInvoker}.
- *
- * <p>This is <i>not</i> exposed via the reflective capabilities
- * of {@link DoFn}.
- *
- * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require
- * state and timers, they will need to wait for the arrival of those features. Do not introduce
- * new uses of this method.
- */
- @Deprecated
- WindowingInternals<InputT, OutputT> windowingInternals();
-
- /**
- * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
- * the current {@link ProcessElement} call.
- */
- <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
-
- /**
- * Returns the state cell for the given {@link StateId}.
- */
- State state(String stateId);
-
- /**
- * Returns the timer for the given {@link TimerId}.
- */
- Timer timer(String timerId);
- }
-
/** Receives values of the given type. */
public interface OutputReceiver<T> {
void output(T output);
@@ -413,54 +339,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
T get();
}
- /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */
- public static class FakeArgumentProvider<InputT, OutputT>
- implements ArgumentProvider<InputT, OutputT> {
- @Override
- public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- return null;
- }
-
- @Override
- public BoundedWindow window() {
- return null;
- }
-
- @Override
- public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
- return null;
- }
-
- @Override
- public InputProvider<InputT> inputProvider() {
- return null;
- }
-
- @Override
- public OutputReceiver<OutputT> outputReceiver() {
- return null;
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return null;
- }
-
- @Override
- public State state(String stateId) {
- return null;
- }
-
- @Override
- public Timer timer(String timerId) {
- return null;
- }
-
- public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
- return null;
- }
- }
-
/////////////////////////////////////////////////////////////////////////////
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 71a6d1d..a3466bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -77,7 +77,7 @@ public class DoFnAdapters {
public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
OldDoFn<InputT, OutputT> fn,
final DoFn<InputT, OutputT>.ProcessContext c,
- final DoFn.ArgumentProvider<InputT, OutputT> extra) {
+ final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
return fn.new ProcessContext() {
@Override
public InputT element() {
@@ -270,12 +270,12 @@ public class DoFnAdapters {
}
/**
- * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ArgumentProvider} inside a {@link
+ * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
* DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
* unavailable.
*/
private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
- implements DoFn.ArgumentProvider<InputT, OutputT> {
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
private OldDoFn<InputT, OutputT>.Context context;
@@ -371,11 +371,11 @@ public class DoFnAdapters {
}
/**
- * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ArgumentProvider} method.
+ * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
*/
private static class ProcessContextAdapter<InputT, OutputT>
extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFn.ArgumentProvider<InputT, OutputT> {
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
private OldDoFn<InputT, OutputT>.ProcessContext context;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index bc6d8c9..9998c9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -101,7 +101,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
/**
* Creates a {@link DoFnInvoker} for the given {@link DoFn} by generating bytecode that directly
- * invokes its methods with arguments extracted from the {@link DoFn.ArgumentProvider}.
+ * invokes its methods with arguments extracted from the {@link DoFnInvoker.ArgumentProvider}.
*/
@Override
public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, OutputT> fn) {
@@ -149,19 +149,19 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
/**
* Associates the given timer ID with the given {@link OnTimerInvoker}.
*
- * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup
- * of the timer ID rather than a generated conditional branch to choose which
- * OnTimerInvoker to invoke.
+ * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup of the
+ * timer ID rather than a generated conditional branch to choose which OnTimerInvoker to invoke.
*
- * <p>This method has package level access as it is intended only for assembly of the
- * {@link DoFnInvokerBase} not by any subclass.
+ * <p>This method has package level access as it is intended only for assembly of the {@link
+ * DoFnInvokerBase} not by any subclass.
*/
void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) {
this.onTimerInvokers.put(timerId, onTimerInvoker);
}
@Override
- public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments) {
+ public void invokeOnTimer(
+ String timerId, DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) {
@Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId);
if (onTimerInvoker != null) {
@@ -193,8 +193,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
getByteBuddyInvokerConstructor(signature).newInstance(fn);
for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
- invoker.addOnTimerInvoker(onTimerMethod.id(),
- OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
+ invoker.addOnTimerInvoker(
+ onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
}
return invoker;
@@ -326,8 +326,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
} else {
return new DowncastingParametersMethodDelegation(
- doFnType,
- signature.getRestrictionCoder().targetMethod());
+ doFnType, signature.getRestrictionCoder().targetMethod());
}
} else {
return ExceptionMethod.throwing(UnsupportedOperationException.class);
@@ -345,8 +344,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
}
/** Delegates to the given method if available, or does nothing. */
- private static Implementation delegateOrNoop(TypeDescription doFnType, DoFnSignature.DoFnMethod
- method) {
+ private static Implementation delegateOrNoop(
+ TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
return (method == null)
? FixedValue.originType()
: new DoFnMethodDelegation(doFnType, method.targetMethod());
@@ -504,19 +503,19 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
String methodName, Class<?>... parameterTypes) {
try {
return new MethodDescription.ForLoadedMethod(
- DoFn.ArgumentProvider.class.getMethod(methodName, parameterTypes));
+ DoFnInvoker.ArgumentProvider.class.getMethod(methodName, parameterTypes));
} catch (Exception e) {
throw new IllegalStateException(
String.format(
"Failed to locate required method %s.%s",
- DoFn.ArgumentProvider.class.getSimpleName(), methodName),
+ DoFnInvoker.ArgumentProvider.class.getSimpleName(), methodName),
e);
}
}
/**
- * Calls a zero-parameter getter on the {@link DoFn.ArgumentProvider}, which must be on top of the
- * stack.
+ * Calls a zero-parameter getter on the {@link DoFnInvoker.ArgumentProvider}, which must be on top
+ * of the stack.
*/
private static StackManipulation simpleExtraContextParameter(String methodName) {
return new StackManipulation.Compound(
@@ -565,7 +564,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@Override
public StackManipulation dispatch(RestrictionTrackerParameter p) {
- // DoFn.ArgumentProvider.restrictionTracker() returns a RestrictionTracker,
+ // DoFnInvoker.ArgumentProvider.restrictionTracker() returns a RestrictionTracker,
// but the @ProcessElement method expects a concrete subtype of it.
// Insert a downcast.
return new StackManipulation.Compound(
@@ -613,8 +612,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
private final DoFnSignature.ProcessElementMethod signature;
/** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
- private ProcessElementDelegation(TypeDescription doFnType, DoFnSignature.ProcessElementMethod
- signature) {
+ private ProcessElementDelegation(
+ TypeDescription doFnType, DoFnSignature.ProcessElementMethod signature) {
super(doFnType, signature.targetMethod());
this.signature = signature;
}
@@ -622,7 +621,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@Override
protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
// Parameters of the wrapper invoker method:
- // DoFn.ArgumentProvider
+ // DoFnInvoker.ArgumentProvider
// Parameters of the wrapped DoFn method:
// [DoFn.ProcessContext, BoundedWindow, InputProvider, OutputReceiver] in any order
ArrayList<StackManipulation> pushParameters = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 2ae7920..d899207 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -20,7 +20,19 @@ package org.apache.beam.sdk.transforms.reflect;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.InputProvider;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.StartBundle;
+import org.apache.beam.sdk.transforms.DoFn.StateId;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.State;
/**
* Interface for invoking the {@code DoFn} processing methods.
@@ -48,11 +60,10 @@ public interface DoFnInvoker<InputT, OutputT> {
* @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
* DoFn.ProcessContinuation#stop()} if it returns {@code void}.
*/
- DoFn.ProcessContinuation invokeProcessElement(DoFn.ArgumentProvider<InputT, OutputT> extra);
+ DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
/** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */
- void invokeOnTimer(
- String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments);
+ void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments);
/** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */
<RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element);
@@ -72,4 +83,108 @@ public interface DoFnInvoker<InputT, OutputT> {
/** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
<RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> TrackerT invokeNewTracker(
RestrictionT restriction);
+
+ /**
+ * Interface for runner implementors to provide implementations of extra context information.
+ *
+ * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking an annotated
+ * {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that has indicated
+ * it needs the given extra context.
+ *
+ * <p>In the case of {@link ProcessElement} it is called once per invocation of {@link
+ * ProcessElement}.
+ */
+ interface ArgumentProvider<InputT, OutputT> {
+ /**
+ * Construct the {@link BoundedWindow} to use within a {@link DoFn} that needs it. This is
+ * called if the {@link ProcessElement} method has a parameter of type {@link BoundedWindow}.
+ *
+ * @return {@link BoundedWindow} of the element currently being processed.
+ */
+ BoundedWindow window();
+
+ /** Provide a {@link DoFn.Context} to use with the given {@link DoFn}. */
+ DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn);
+
+ /** Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. */
+ DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn);
+
+ /** A placeholder for testing purposes. */
+ InputProvider<InputT> inputProvider();
+
+ /** A placeholder for testing purposes. */
+ OutputReceiver<OutputT> outputReceiver();
+
+ /**
+ * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so
+ * an {@link OldDoFn} can be run via {@link DoFnInvoker}.
+ *
+ * <p>This is <i>not</i> exposed via the reflective capabilities of {@link DoFn}.
+ *
+ * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state
+ * and timers, they will need to wait for the arrival of those features. Do not introduce
+ * new uses of this method.
+ */
+ @Deprecated
+ WindowingInternals<InputT, OutputT> windowingInternals();
+
+ /**
+ * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
+ * the current {@link ProcessElement} call.
+ */
+ <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
+
+ /** Returns the state cell for the given {@link StateId}. */
+ State state(String stateId);
+
+ /** Returns the timer for the given {@link TimerId}. */
+ Timer timer(String timerId);
+ }
+
+ /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */
+ class FakeArgumentProvider<InputT, OutputT> implements ArgumentProvider<InputT, OutputT> {
+ @Override
+ public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ return null;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return null;
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
+ return null;
+ }
+
+ @Override
+ public InputProvider<InputT> inputProvider() {
+ return null;
+ }
+
+ @Override
+ public OutputReceiver<OutputT> outputReceiver() {
+ return null;
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return null;
+ }
+
+ @Override
+ public State state(String stateId) {
+ return null;
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ return null;
+ }
+
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 7eccaab..15ba198 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -100,7 +100,7 @@ public class DoFnInvokers {
@Override
public DoFn.ProcessContinuation invokeProcessElement(
- DoFn.ArgumentProvider<InputT, OutputT> extra) {
+ ArgumentProvider<InputT, OutputT> extra) {
// The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly
DoFn<InputT, OutputT>.ProcessContext newCtx =
extra.processContext(new DoFn<InputT, OutputT>() {});
@@ -115,7 +115,7 @@ public class DoFnInvokers {
}
@Override
- public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments) {
+ public void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments) {
throw new UnsupportedOperationException(
String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
index bfcafd0..3fbad0f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
@@ -17,11 +17,11 @@
*/
package org.apache.beam.sdk.transforms.reflect;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
-/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */
+/** Interface for invoking the {@link OnTimer} method for a particular timer. */
public interface OnTimerInvoker<InputT, OutputT> {
- /** Invoke the {@link DoFn.OnTimer} method in the provided context. */
- void invokeOnTimer(DoFn.ArgumentProvider<InputT, OutputT> extra);
+ /** Invoke the {@link OnTimer} method in the provided context. */
+ void invokeOnTimer(DoFnInvoker.ArgumentProvider<InputT, OutputT> extra);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 3d9e3ec..456a6eb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -40,10 +40,9 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ArgumentProvider;
-import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,7 +79,7 @@ public class DoFnInvokersTest {
@Mock private DoFn.InputProvider<String> mockInputProvider;
@Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
@Mock private WindowingInternals<String, String> mockWindowingInternals;
- @Mock private ArgumentProvider<String, String> mockArgumentProvider;
+ @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
@Mock private OldDoFn<String, String> mockOldDoFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
index d51e9cc..177f15f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
@@ -43,7 +43,7 @@ public class OnTimerInvokersTest {
@Mock private BoundedWindow mockWindow;
- @Mock private DoFn.ArgumentProvider<String, String> mockArgumentProvider;
+ @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
@Before
public void setUp() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
index e0fdac6..442bdec 100644
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -21,10 +21,11 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.ArgumentProvider;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -56,7 +57,7 @@ public class DoFnInvokersBenchmark {
private StubOldDoFnProcessContext stubOldDoFnContext =
new StubOldDoFnProcessContext(oldDoFn, ELEMENT);
private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT);
- private DoFn.ArgumentProvider<String, String> argumentProvider =
+ private ArgumentProvider<String, String> argumentProvider =
new FakeArgumentProvider<>();
private OldDoFn<String, String> adaptedDoFnWithContext;