You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 18:09:40 UTC
[03/13] incubator-beam git commit: Add basic test for timers in
ParDoTest
Add basic test for timers in ParDoTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/50ffc7be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/50ffc7be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/50ffc7be
Branch: refs/heads/master
Commit: 50ffc7be7f41a38ce214f30dd76aa56ddbd245aa
Parents: a99dba5
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:49:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/ParDoTest.java | 49 ++++++++++++++++++++
1 file changed, 49 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50ffc7be/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 41e795e..36666b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,7 +55,9 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -69,6 +71,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -1571,6 +1574,52 @@ public class ParDoTest implements Serializable {
p.run();
}
+ /**
+ * Tests that an event time timer fires and results in supplementary output.
+ *
+ * <p>This test relies on two properties:
+ *
+ * <ol>
+ * <li>A timer that is set on time should always get a chance to fire. For this to be true, timers
+ * per-key-and-window must be delivered in order so the timer is not wiped out until the
+ * window is expired by the runner.
+ * <li>A {@link Create} transform sends its elements on time, and later advances the watermark to
+ * infinity
+ * </ol>
+ *
+ * <p>Note that {@link TestStream} is not applicable because it requires very special runner hooks
+ * and is only supported by the direct runner.
+ */
+ @Test
+ @Category({RunnableOnService.class, UsesTimersInParDo.class})
+ public void testSimpleEventTimeTimer() throws Exception {
+ final String timerId = "foo";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+ timer.setForNowPlus(Duration.standardSeconds(1));
+ context.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context) {
+ context.output(42);
+ }
+ };
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<Integer> output = p.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3, 42);
+ p.run();
+ }
+
@Test
public void testWithOutputTagsDisplayData() {
DoFn<String, String> fn = new DoFn<String, String>() {