You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:42:15 UTC

[GitHub] [beam] je-ik commented on a change in pull request #9190: [BEAM-7520] Fix timer firing order in DirectRunner

je-ik commented on a change in pull request #9190:
URL: https://github.com/apache/beam/pull/9190#discussion_r430680574



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -3488,6 +3499,158 @@ public void onTimer(OutputReceiver<String> r) {
       pipeline.run();
     }
 
+    /** A test makes sure that an event time timers are correctly ordered. */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStream.class,
+      UsesStatefulParDo.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrdering() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+      TestStream.Builder<KV<String, String>> builder =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+              .advanceWatermarkTo(new Instant(0));
+
+      for (int i = 0; i < numTestElements; i++) {
+        builder = builder.addElements(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i)));
+        builder = builder.advanceWatermarkTo(now.plus(i / 10 * 10));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(
+          now, numTestElements, builder.advanceWatermarkToInfinity());
+    }
+
+    /** A test makes sure that an event time timers are correctly ordered using Create transform. */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesStatefulParDo.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrderingWithCreate() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+
+      List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+      for (int i = 0; i < numTestElements; i++) {
+        elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i)));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(

Review comment:
       The purpose of the test is to verify, that timers are fired in timestamp order, not that elements arrive ordered. The test already mimics some of the functionality of `@RequiresTimeSortedInput`, which should solve the issue you mention. This test should be a requirement (`UsesStrictTimerOrdering`) for a runer to be able to support `@RequiresTimeSortedInput`. Have you seen any issues with the test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org