You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/27 04:10:50 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #9190: [BEAM-7520] Fix timer firing order in DirectRunner

lukecwik commented on a change in pull request #9190:
URL: https://github.com/apache/beam/pull/9190#discussion_r430620228



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -3488,6 +3499,158 @@ public void onTimer(OutputReceiver<String> r) {
       pipeline.run();
     }
 
+    /** A test makes sure that an event time timers are correctly ordered. */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStream.class,
+      UsesStatefulParDo.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrdering() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+      TestStream.Builder<KV<String, String>> builder =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+              .advanceWatermarkTo(new Instant(0));
+
+      for (int i = 0; i < numTestElements; i++) {
+        builder = builder.addElements(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i)));
+        builder = builder.advanceWatermarkTo(now.plus(i / 10 * 10));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(
+          now, numTestElements, builder.advanceWatermarkToInfinity());
+    }
+
+    /** A test makes sure that an event time timers are correctly ordered using Create transform. */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesStatefulParDo.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrderingWithCreate() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+
+      List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+      for (int i = 0; i < numTestElements; i++) {
+        elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i)));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(

Review comment:
       The elements in the PCollection produced by the Create transform are considered unordered so the stateful DoFn could see (dummy, 1) before it sees (dummy, 0). Doesn't the DoFn need to be marked with `@RequiresTimeSortedInput` in this case?

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -3488,6 +3499,158 @@ public void onTimer(OutputReceiver<String> r) {
       pipeline.run();
     }
 
+    /** A test makes sure that an event time timers are correctly ordered. */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStream.class,
+      UsesStatefulParDo.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrdering() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+      TestStream.Builder<KV<String, String>> builder =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+              .advanceWatermarkTo(new Instant(0));
+
+      for (int i = 0; i < numTestElements; i++) {
+        builder = builder.addElements(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i)));
+        builder = builder.advanceWatermarkTo(now.plus(i / 10 * 10));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(
+          now, numTestElements, builder.advanceWatermarkToInfinity());
+    }
+
+    /** A test makes sure that an event time timers are correctly ordered using Create transform. */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesStatefulParDo.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrderingWithCreate() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+
+      List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+      for (int i = 0; i < numTestElements; i++) {
+        elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i)));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(

Review comment:
       @je-ik @y1chi @kennknowles 
   The elements in the PCollection produced by the Create transform are considered unordered so the stateful DoFn could see `(dummy, 1)` before it sees `(dummy, 0)`. Doesn't the DoFn need to be marked with `@RequiresTimeSortedInput` in this case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org