You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/02 23:42:46 UTC
[2/4] incubator-beam git commit: Supports window parameter in
DoFnTester
Supports window parameter in DoFnTester
Also prohibits other parameters, and prohibits output from bundle
methods (whereas previously it was silently dropped).
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78ac009b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78ac009b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78ac009b
Branch: refs/heads/master
Commit: 78ac009be743a2e053580e9966f841174b636e88
Parents: 9645576
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 2 11:39:48 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 2 15:42:33 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 166 ++++++++++++++-----
.../beam/sdk/transforms/DoFnTesterTest.java | 34 ++++
2 files changed, 158 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78ac009b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index a9f93dd..7c1abef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -38,13 +38,18 @@ import org.apache.beam.sdk.testing.ValueInSingleWindow;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
@@ -84,6 +89,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
/**
* Returns a {@code DoFnTester} supporting unit-testing of the given
* {@link DoFn}. By default, uses {@link CloningBehavior#CLONE_ONCE}.
+ *
+ * <p>The only supported extra parameter of the {@link DoFn.ProcessElement} method is
+ * {@link BoundedWindow}.
*/
@SuppressWarnings("unchecked")
public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
@@ -236,7 +244,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
if (state == State.UNINITIALIZED) {
initializeState();
}
- TestContext context = createContext(fn);
+ TestContext context = new TestContext();
context.setupDelegateAggregators();
// State and timer internals are per-bundle.
stateInternals = InMemoryStateInternals.forKey(new Object());
@@ -262,7 +270,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
/**
* Calls the {@link DoFn.ProcessElement} method on the {@link DoFn} under test, in a
* context where {@link DoFn.ProcessContext#element} returns the
- * given element.
+ * given element and the element is in the global window.
*
* <p>Will call {@link #startBundle} automatically, if it hasn't
* already been called.
@@ -277,26 +285,86 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
/**
* Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a
* context where {@link DoFn.ProcessContext#element} returns the
- * given element and timestamp.
+ * given element and timestamp and the element is in the global window.
*
* <p>Will call {@link #startBundle} automatically, if it hasn't
* already been called.
- *
- * <p>If the input timestamp is {@literal null}, the minimum timestamp will be used.
*/
public void processTimestampedElement(TimestampedValue<InputT> element) throws Exception {
checkNotNull(element, "Timestamped element cannot be null");
+ processWindowedElement(
+ element.getValue(), element.getTimestamp(), GlobalWindow.INSTANCE);
+ }
+
+ /**
+ * Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a
+ * context where {@link DoFn.ProcessContext#element} returns the
+ * given element and timestamp and the element is in the given window.
+ *
+ * <p>Will call {@link #startBundle} automatically, if it hasn't
+ * already been called.
+ */
+ public void processWindowedElement(
+ InputT element, Instant timestamp, final BoundedWindow window) throws Exception {
if (state != State.BUNDLE_STARTED) {
startBundle();
}
try {
- final TestProcessContext processContext = createProcessContext(element);
- fnInvoker.invokeProcessElement(new DoFnInvoker.FakeArgumentProvider<InputT, OutputT>() {
- @Override
- public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- return processContext;
- }
- });
+ final TestProcessContext processContext =
+ new TestProcessContext(
+ ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING));
+ fnInvoker.invokeProcessElement(
+ new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+ @Override
+ public BoundedWindow window() {
+ return window;
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Not expected to access DoFn.Context from @ProcessElement");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ return processContext;
+ }
+
+ @Override
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException(
+ "Not expected to access InputProvider from DoFnTester");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException(
+ "Not expected to access OutputReceiver from DoFnTester");
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ throw new UnsupportedOperationException(
+ "Not expected to access WindowingInternals from a new DoFn");
+ }
+
+ @Override
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ throw new UnsupportedOperationException(
+ "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");
+ }
+
+ @Override
+ public org.apache.beam.sdk.util.state.State state(String stateId) {
+ throw new UnsupportedOperationException("DoFnTester doesn't support state yet");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException("DoFnTester doesn't support timers yet");
+ }
+ });
} catch (UserCodeException e) {
unwrapUserCodeException(e);
}
@@ -318,7 +386,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
"Must be inside bundle to call finishBundle, but was: %s",
state);
try {
- fnInvoker.invokeFinishBundle(createContext(fn));
+ fnInvoker.invokeFinishBundle(new TestContext());
} catch (UserCodeException e) {
unwrapUserCodeException(e);
}
@@ -543,10 +611,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return mainOutputTag;
}
- private TestContext createContext(DoFn<InputT, OutputT> fn) {
- return new TestContext();
- }
-
private class TestContext extends DoFn<InputT, OutputT>.Context {
TestContext() {
fn.super();
@@ -559,12 +623,27 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Override
public void output(OutputT output) {
- sideOutput(mainOutputTag, output);
+ throwUnsupportedOutputFromBundleMethods();
}
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
- sideOutputWithTimestamp(mainOutputTag, output, timestamp);
+ throwUnsupportedOutputFromBundleMethods();
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ throwUnsupportedOutputFromBundleMethods();
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ throwUnsupportedOutputFromBundleMethods();
+ }
+
+ private void throwUnsupportedOutputFromBundleMethods() {
+ throw new UnsupportedOperationException(
+ "DoFnTester doesn't support output from bundle methods");
}
@Override
@@ -613,26 +692,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
return aggregator;
}
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- sideOutputWithTimestamp(tag, output, BoundedWindow.TIMESTAMP_MIN_VALUE);
- }
-
- public <T> void noteOutput(TupleTag<T> tag, ValueInSingleWindow<T> output) {
- getMutableOutput(tag).add(output);
- }
- }
-
- private TestProcessContext createProcessContext(TimestampedValue<InputT> elem) {
- return new TestProcessContext(
- ValueInSingleWindow.of(
- elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}
private class TestProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
@@ -641,7 +700,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private TestProcessContext(ValueInSingleWindow<InputT> element) {
fn.super();
- this.context = createContext(fn);
+ this.context = new TestContext();
this.element = element;
}
@@ -699,8 +758,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.noteOutput(
- tag, ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
+ getMutableOutput(tag)
+ .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
}
@Override
@@ -772,6 +831,29 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private DoFnTester(DoFn<InputT, OutputT> origFn) {
this.origFn = origFn;
+ DoFnSignature signature = DoFnSignatures.signatureForDoFn(origFn);
+ for (DoFnSignature.Parameter param : signature.processElement().extraParameters()) {
+ param.match(
+ new DoFnSignature.Parameter.Cases.WithDefault<Void>() {
+ @Override
+ public Void dispatch(DoFnSignature.Parameter.ProcessContextParameter p) {
+ // ProcessContext parameter is obviously supported.
+ return null;
+ }
+
+ @Override
+ public Void dispatch(DoFnSignature.Parameter.WindowParameter p) {
+ // We also support the BoundedWindow parameter.
+ return null;
+ }
+
+ @Override
+ protected Void dispatchDefault(DoFnSignature.Parameter p) {
+ throw new UnsupportedOperationException(
+ "Parameter " + p + " not supported by DoFnTester");
+ }
+ });
+ }
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78ac009b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index ff8a9bc..b47465e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -30,13 +30,16 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
@@ -350,6 +353,37 @@ public class DoFnTesterTest {
}
}
+ @Test
+ public void testSupportsWindowParameter() throws Exception {
+ Instant now = Instant.now();
+ try (DoFnTester<Integer, KV<Integer, BoundedWindow>> tester =
+ DoFnTester.of(new DoFnWithWindowParameter())) {
+ BoundedWindow firstWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(1)));
+ tester.processWindowedElement(1, now, firstWindow);
+ tester.processWindowedElement(2, now, firstWindow);
+ BoundedWindow secondWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(4)));
+ tester.processWindowedElement(3, now, secondWindow);
+ tester.finishBundle();
+
+ assertThat(
+ tester.peekOutputElementsInWindow(firstWindow),
+ containsInAnyOrder(
+ TimestampedValue.of(KV.of(1, firstWindow), now),
+ TimestampedValue.of(KV.of(2, firstWindow), now)));
+ assertThat(
+ tester.peekOutputElementsInWindow(secondWindow),
+ containsInAnyOrder(
+ TimestampedValue.of(KV.of(3, secondWindow), now)));
+ }
+ }
+
+ private static class DoFnWithWindowParameter extends DoFn<Integer, KV<Integer, BoundedWindow>> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ c.output(KV.of(c.element(), window));
+ }
+ }
+
private static class SideInputDoFn extends DoFn<Integer, Integer> {
private final PCollectionView<Integer> value;