You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 18:09:40 UTC

[03/13] incubator-beam git commit: Add basic test for timers in ParDoTest

Add basic test for timers in ParDoTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/50ffc7be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/50ffc7be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/50ffc7be

Branch: refs/heads/master
Commit: 50ffc7be7f41a38ce214f30dd76aa56ddbd245aa
Parents: a99dba5
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:49:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 49 ++++++++++++++++++++
 1 file changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50ffc7be/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 41e795e..36666b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,7 +55,9 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -69,6 +71,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -1571,6 +1574,52 @@ public class ParDoTest implements Serializable {
     p.run();
   }
 
+  /**
+   * Tests that an event time timer fires and results in supplementary output.
+   *
+   * <p>This test relies on two properties:
+   *
+   * <ol>
+   * <li>A timer that is set on time should always get a chance to fire. For this to be true, timers
+   *     per-key-and-window must be delivered in order so the timer is not wiped out until the
+   *     window is expired by the runner.
+   * <li>A {@link Create} transform sends its elements on time, and later advances the watermark to
+   *     infinity
+   * </ol>
+   *
+   * <p>Note that {@link TestStream} is not applicable because it requires very special runner hooks
+   * and is only supported by the direct runner.
+   */
+  @Test
+  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  public void testSimpleEventTimeTimer() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            timer.setForNowPlus(Duration.standardSeconds(1));
+            context.output(3);
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(42);
+          }
+        };
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<Integer> output = p.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(3, 42);
+    p.run();
+  }
+
   @Test
   public void testWithOutputTagsDisplayData() {
     DoFn<String, String> fn = new DoFn<String, String>() {