You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/15 21:19:22 UTC
[1/2] incubator-beam git commit: This closes #1360
Repository: incubator-beam
Updated Branches:
refs/heads/master 503f26f44 -> 201110222
This closes #1360
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20111022
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20111022
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20111022
Branch: refs/heads/master
Commit: 20111022256e641ad288c28d6953c15314eb6a0d
Parents: 503f26f 469c689
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 13:11:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 15 13:11:40 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 4 +--
.../beam/runners/core/SplittableParDo.java | 16 ++++++----
.../org/apache/beam/sdk/transforms/DoFn.java | 8 ++---
.../beam/sdk/transforms/DoFnAdapters.java | 10 +++----
.../reflect/ByteBuddyDoFnInvokerFactory.java | 15 +++++-----
.../reflect/ByteBuddyOnTimerInvokerFactory.java | 4 +--
.../sdk/transforms/reflect/DoFnInvoker.java | 2 +-
.../sdk/transforms/reflect/DoFnInvokers.java | 3 +-
.../sdk/transforms/reflect/OnTimerInvoker.java | 2 +-
.../transforms/reflect/DoFnInvokersTest.java | 31 ++++++++++----------
.../transforms/reflect/OnTimerInvokersTest.java | 7 ++---
.../transforms/DoFnInvokersBenchmark.java | 8 ++---
12 files changed, 56 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Rename DoFn.ExtraContextFactory to
DoFn.ArgumentProvider
Posted by ke...@apache.org.
Rename DoFn.ExtraContextFactory to DoFn.ArgumentProvider
The prior name started in the right place, but the role has gradually
morphed into a provider for all DoFn method arguments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/469c689c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/469c689c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/469c689c
Branch: refs/heads/master
Commit: 469c689cc6bd0fe74658bf95b1e206cef3e0711d
Parents: 503f26f
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 14 22:19:35 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 15 13:11:40 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 4 +--
.../beam/runners/core/SplittableParDo.java | 16 ++++++----
.../org/apache/beam/sdk/transforms/DoFn.java | 8 ++---
.../beam/sdk/transforms/DoFnAdapters.java | 10 +++----
.../reflect/ByteBuddyDoFnInvokerFactory.java | 15 +++++-----
.../reflect/ByteBuddyOnTimerInvokerFactory.java | 4 +--
.../sdk/transforms/reflect/DoFnInvoker.java | 2 +-
.../sdk/transforms/reflect/DoFnInvokers.java | 3 +-
.../sdk/transforms/reflect/OnTimerInvoker.java | 2 +-
.../transforms/reflect/DoFnInvokersTest.java | 31 ++++++++++----------
.../transforms/reflect/OnTimerInvokersTest.java | 7 ++---
.../transforms/DoFnInvokersBenchmark.java | 8 ++---
12 files changed, 56 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 1550303..c046d11 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -179,7 +179,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
+ implements DoFn.ArgumentProvider<InputT, OutputT> {
private static final int MAX_SIDE_OUTPUTS = 1000;
final PipelineOptions options;
@@ -422,7 +422,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
private class DoFnProcessContext<InputT, OutputT> extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
+ implements DoFn.ArgumentProvider<InputT, OutputT> {
final DoFn<InputT, OutputT> fn;
final DoFnContext<InputT, OutputT> context;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index e344f92..3003984 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -391,19 +391,23 @@ public class SplittableParDo<
};
}
- /** Creates an {@link DoFn.ExtraContextFactory} that provides just the given tracker. */
- private DoFn.ExtraContextFactory<InputT, OutputT> wrapTracker(
+ /**
+ * Creates an {@link DoFn.ArgumentProvider} that provides the given tracker as well as the given
+ * {@link ProcessContext} (which is also provided when a {@link Context} is requested.
+ */
+ private DoFn.ArgumentProvider<InputT, OutputT> wrapTracker(
TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) {
- return new ExtraContextFactoryForTracker<>(tracker, processContext);
+
+ return new ArgumentProviderForTracker<>(tracker, processContext);
}
- private static class ExtraContextFactoryForTracker<
+ private static class ArgumentProviderForTracker<
InputT, OutputT, TrackerT extends RestrictionTracker<?>>
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
+ implements DoFn.ArgumentProvider<InputT, OutputT> {
private final TrackerT tracker;
private final DoFn<InputT, OutputT>.ProcessContext processContext;
- ExtraContextFactoryForTracker(
+ ArgumentProviderForTracker(
TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) {
this.tracker = tracker;
this.processContext = processContext;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 3c8e613..bf0631b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -341,7 +341,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* <p>In the case of {@link ProcessElement} it is called once per invocation of
* {@link ProcessElement}.
*/
- public interface ExtraContextFactory<InputT, OutputT> {
+ public interface ArgumentProvider<InputT, OutputT> {
/**
* Construct the {@link BoundedWindow} to use within a {@link DoFn} that
* needs it. This is called if the {@link ProcessElement} method has a parameter of type
@@ -413,9 +413,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
T get();
}
- /** For testing only, this {@link ExtraContextFactory} returns {@code null} for all parameters. */
- public static class FakeExtraContextFactory<InputT, OutputT>
- implements ExtraContextFactory<InputT, OutputT> {
+ /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */
+ public static class FakeArgumentProvider<InputT, OutputT>
+ implements ArgumentProvider<InputT, OutputT> {
@Override
public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 73a1e40..71a6d1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -77,7 +77,7 @@ public class DoFnAdapters {
public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
OldDoFn<InputT, OutputT> fn,
final DoFn<InputT, OutputT>.ProcessContext c,
- final DoFn.ExtraContextFactory<InputT, OutputT> extra) {
+ final DoFn.ArgumentProvider<InputT, OutputT> extra) {
return fn.new ProcessContext() {
@Override
public InputT element() {
@@ -270,12 +270,12 @@ public class DoFnAdapters {
}
/**
- * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ExtraContextFactory} inside a {@link
+ * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ArgumentProvider} inside a {@link
* DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
* unavailable.
*/
private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
+ implements DoFn.ArgumentProvider<InputT, OutputT> {
private OldDoFn<InputT, OutputT>.Context context;
@@ -371,11 +371,11 @@ public class DoFnAdapters {
}
/**
- * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ExtraContextFactory} method.
+ * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ArgumentProvider} method.
*/
private static class ProcessContextAdapter<InputT, OutputT>
extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFn.ExtraContextFactory<InputT, OutputT> {
+ implements DoFn.ArgumentProvider<InputT, OutputT> {
private OldDoFn<InputT, OutputT>.ProcessContext context;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 38e1141..c137255 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -64,7 +64,6 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter;
@@ -101,7 +100,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
/**
* Creates a {@link DoFnInvoker} for the given {@link DoFn} by generating bytecode that directly
- * invokes its methods with arguments extracted from the {@link ExtraContextFactory}.
+ * invokes its methods with arguments extracted from the {@link DoFn.ArgumentProvider}.
*/
@Override
public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, OutputT> fn) {
@@ -428,18 +427,18 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
String methodName, Class<?>... parameterTypes) {
try {
return new MethodDescription.ForLoadedMethod(
- ExtraContextFactory.class.getMethod(methodName, parameterTypes));
+ DoFn.ArgumentProvider.class.getMethod(methodName, parameterTypes));
} catch (Exception e) {
throw new IllegalStateException(
String.format(
"Failed to locate required method %s.%s",
- ExtraContextFactory.class.getSimpleName(), methodName),
+ DoFn.ArgumentProvider.class.getSimpleName(), methodName),
e);
}
}
/**
- * Calls a zero-parameter getter on the {@link ExtraContextFactory}, which must be on top of the
+ * Calls a zero-parameter getter on the {@link DoFn.ArgumentProvider}, which must be on top of the
* stack.
*/
private static StackManipulation simpleExtraContextParameter(String methodName) {
@@ -489,7 +488,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@Override
public StackManipulation dispatch(RestrictionTrackerParameter p) {
- // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker,
+ // DoFn.ArgumentProvider.restrictionTracker() returns a RestrictionTracker,
// but the @ProcessElement method expects a concrete subtype of it.
// Insert a downcast.
return new StackManipulation.Compound(
@@ -545,9 +544,9 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@Override
protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
// Parameters of the wrapper invoker method:
- // DoFn.ProcessContext, ExtraContextFactory.
+ // DoFn.ArgumentProvider
// Parameters of the wrapped DoFn method:
- // DoFn.ProcessContext, [BoundedWindow, InputProvider, OutputReceiver] in any order
+ // [DoFn.ProcessContext, BoundedWindow, InputProvider, OutputReceiver] in any order
ArrayList<StackManipulation> pushParameters = new ArrayList<>();
// To load the delegate, push `this` and then access the field
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index 3060733..4e53757 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -176,7 +176,7 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
.withParameter(fnClass)
.intercept(new InvokerConstructor())
- // public invokeOnTimer(ExtraContextFactory) {
+ // public invokeOnTimer(DoFn.ArgumentProvider) {
// this.delegate.<@OnTimer method>(... pass the right args ...)
// }
.method(ElementMatchers.named("invokeOnTimer"))
@@ -211,7 +211,7 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
@Override
protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
// Parameters of the wrapper invoker method:
- // ExtraContextFactory.
+ // DoFn.ArgumentProvider
// Parameters of the wrapped DoFn method:
// a dynamic set of allowed "extra" parameters in any order subject to
// validation prior to getting the DoFnSignature
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index d7011a2..ce68d0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -48,7 +48,7 @@ public interface DoFnInvoker<InputT, OutputT> {
* @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
* DoFn.ProcessContinuation#stop()} if it returns {@code void}.
*/
- DoFn.ProcessContinuation invokeProcessElement(DoFn.ExtraContextFactory<InputT, OutputT> extra);
+ DoFn.ProcessContinuation invokeProcessElement(DoFn.ArgumentProvider<InputT, OutputT> extra);
/** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */
<RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index c9e4bf1..9a96985 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -101,7 +100,7 @@ public class DoFnInvokers {
@Override
public DoFn.ProcessContinuation invokeProcessElement(
- ExtraContextFactory<InputT, OutputT> extra) {
+ DoFn.ArgumentProvider<InputT, OutputT> extra) {
// The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly
DoFn<InputT, OutputT>.ProcessContext newCtx =
extra.processContext(new DoFn<InputT, OutputT>() {});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
index de9d667..f87fa74 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
@@ -23,5 +23,5 @@ import org.apache.beam.sdk.transforms.DoFn;
interface OnTimerInvoker<InputT, OutputT> {
/** Invoke the {@link DoFn.OnTimer} method in the provided context. */
- void invokeOnTimer(DoFn.ExtraContextFactory<InputT, OutputT> extra);
+ void invokeOnTimer(DoFn.ArgumentProvider<InputT, OutputT> extra);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 95dc643..c7b71ff 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -39,7 +39,8 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFn.ArgumentProvider;
+import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
@@ -76,22 +77,22 @@ public class DoFnInvokersTest {
@Mock private DoFn.InputProvider<String> mockInputProvider;
@Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
@Mock private WindowingInternals<String, String> mockWindowingInternals;
- @Mock private ExtraContextFactory<String, String> extraContextFactory;
+ @Mock private ArgumentProvider<String, String> mockArgumentProvider;
@Mock private OldDoFn<String, String> mockOldDoFn;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- when(extraContextFactory.window()).thenReturn(mockWindow);
- when(extraContextFactory.inputProvider()).thenReturn(mockInputProvider);
- when(extraContextFactory.outputReceiver()).thenReturn(mockOutputReceiver);
- when(extraContextFactory.windowingInternals()).thenReturn(mockWindowingInternals);
- when(extraContextFactory.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
+ when(mockArgumentProvider.window()).thenReturn(mockWindow);
+ when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider);
+ when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver);
+ when(mockArgumentProvider.windowingInternals()).thenReturn(mockWindowingInternals);
+ when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
}
private ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
- return DoFnInvokers.invokerFor(fn).invokeProcessElement(extraContextFactory);
+ return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
}
@Test
@@ -188,7 +189,7 @@ public class DoFnInvokersTest {
public void testDoFnWithState() throws Exception {
ValueState<Integer> mockState = mock(ValueState.class);
final String stateId = "my-state-id-here";
- when(extraContextFactory.state(stateId)).thenReturn(mockState);
+ when(mockArgumentProvider.state(stateId)).thenReturn(mockState);
class MockFn extends DoFn<String, String> {
@StateId(stateId)
@@ -212,7 +213,7 @@ public class DoFnInvokersTest {
public void testDoFnWithTimer() throws Exception {
Timer mockTimer = mock(Timer.class);
final String timerId = "my-timer-id-here";
- when(extraContextFactory.timer(timerId)).thenReturn(mockTimer);
+ when(mockArgumentProvider.timer(timerId)).thenReturn(mockTimer);
class MockFn extends DoFn<String, String> {
@TimerId(timerId)
@@ -404,7 +405,7 @@ public class DoFnInvokersTest {
assertEquals(
ProcessContinuation.resume(),
invoker.invokeProcessElement(
- new DoFn.FakeExtraContextFactory<String, String>() {
+ new FakeArgumentProvider<String, String>() {
@Override
public DoFn<String, String>.ProcessContext processContext(DoFn<String, String> fn) {
return mockProcessContext;
@@ -455,7 +456,7 @@ public class DoFnInvokersTest {
}
});
assertEquals(
- ProcessContinuation.stop(), invoker.invokeProcessElement(extraContextFactory));
+ ProcessContinuation.stop(), invoker.invokeProcessElement(mockArgumentProvider));
}
// ---------------------------------------------------------------------------------------
@@ -534,7 +535,7 @@ public class DoFnInvokersTest {
});
thrown.expect(UserCodeException.class);
thrown.expectMessage("bogus");
- invoker.invokeProcessElement(new DoFn.FakeExtraContextFactory<Integer, Integer>() {
+ invoker.invokeProcessElement(new FakeArgumentProvider<Integer, Integer>() {
@Override
public DoFn<Integer, Integer>.ProcessContext processContext(DoFn<Integer, Integer> fn) {
return null;
@@ -565,7 +566,7 @@ public class DoFnInvokersTest {
return null;
}
})
- .invokeProcessElement(new DoFn.FakeExtraContextFactory<Integer, Integer>());
+ .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
}
@Test
@@ -611,7 +612,7 @@ public class DoFnInvokersTest {
@Test
public void testOldDoFnProcessElement() throws Exception {
new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn)
- .invokeProcessElement(extraContextFactory);
+ .invokeProcessElement(mockArgumentProvider);
verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
index d29810c..d51e9cc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerSpec;
@@ -44,16 +43,16 @@ public class OnTimerInvokersTest {
@Mock private BoundedWindow mockWindow;
- @Mock private ExtraContextFactory<String, String> mockExtraContextFactory;
+ @Mock private DoFn.ArgumentProvider<String, String> mockArgumentProvider;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- when(mockExtraContextFactory.window()).thenReturn(mockWindow);
+ when(mockArgumentProvider.window()).thenReturn(mockWindow);
}
private void invokeOnTimer(DoFn<String, String> fn, String timerId) {
- OnTimerInvokers.forTimer(fn, timerId).invokeOnTimer(mockExtraContextFactory);
+ OnTimerInvokers.forTimer(fn, timerId).invokeOnTimer(mockArgumentProvider);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/469c689c/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
index b55e17b..e0fdac6 100644
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -21,7 +21,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -56,8 +56,8 @@ public class DoFnInvokersBenchmark {
private StubOldDoFnProcessContext stubOldDoFnContext =
new StubOldDoFnProcessContext(oldDoFn, ELEMENT);
private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT);
- private ExtraContextFactory<String, String> extraContextFactory =
- new DoFn.FakeExtraContextFactory<>();
+ private DoFn.ArgumentProvider<String, String> argumentProvider =
+ new FakeArgumentProvider<>();
private OldDoFn<String, String> adaptedDoFnWithContext;
@@ -83,7 +83,7 @@ public class DoFnInvokersBenchmark {
@Benchmark
public String invokeDoFnWithContext() throws Exception {
- invoker.invokeProcessElement(extraContextFactory);
+ invoker.invokeProcessElement(argumentProvider);
return stubDoFnContext.output;
}