You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/07 21:34:57 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #15135: [ [BEAM-10887] Timer clear] Reapply timer clear

lukecwik commented on a change in pull request #15135:
URL: https://github.com/apache/beam/pull/15135#discussion_r665720963



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4892,348 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    //    public void testSetAndClearProcessingTimeTimer() {
+    //
+    //      final String timerId = "processing-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer>
+    // r) {
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              timer.clear();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(2)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    //    public void testSetAndClearEventTimeTimer() {
+    //      final String timerId = "event-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer>
+    // r) {
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              timer.clear();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+    //              .advanceWatermarkTo(new Instant(0))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    //    public void testClearUnsetProcessingTimeTimer() {
+    //      final String timerId = "processing-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer>
+    // r) {
+    //              timer.clear();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(4)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    //    public void testClearUnsetEventTimeTimer() {
+    //      final String timerId = "event-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer>
+    // r) {
+    //              timer.clear();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+    //              .advanceWatermarkTo(new Instant(0))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    //    public void testClearProcessingTimeTimer() {
+    //      final String timerId = "processing-timer";
+    //      final String clearTimerId = "clear-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @TimerId(clearTimerId)
+    //            private final TimerSpec clearTimerSpec =
+    // TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(
+    //                @TimerId(timerId) Timer timer,
+    //                @TimerId(clearTimerId) Timer clearTimer,
+    //                OutputReceiver<Integer> r) {
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              clearTimer.offset(Duration.standardSeconds(2)).setRelative();
+    //
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(
+    //                OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer clearTimer) {
+    //              System.err.println("onTimer");
+    //              r.output(42);
+    //              clearTimer.clear();
+    //            }
+    //
+    //            // This should never fire since we clear the timer in the earlier timer.
+    //            @OnTimer(clearTimerId)
+    //            public void clearTimer(OutputReceiver<Integer> r) {
+    //              System.err.println("clearTimer");
+    //              r.output(43);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(2)))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(4)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3, 42);
+    //      pipeline.run();
+    //    }
+
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    //    public void testClearEventTimeTimer() {
+    //      final String timerId = "event-timer";
+    //      final String clearTimerId = "clear-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @TimerId(clearTimerId)
+    //            private final TimerSpec clearSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(
+    //                @TimerId(timerId) Timer timer,
+    //                @TimerId(clearTimerId) Timer clearTimer,
+    //                OutputReceiver<Integer> r) {
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              clearTimer.offset(Duration.standardSeconds(2)).setRelative();
+    //
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(
+    //                OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer clearTimer) {
+    //              r.output(42);
+    //              clearTimer.clear();
+    //            }
+    //
+    //            // This should never fire since we clear the timer in the earlier timer.
+    //            @OnTimer(clearTimerId)
+    //            public void clearTimer(OutputReceiver<Integer> r) {
+    //              r.output(43);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+    //              .advanceWatermarkTo(new Instant(0))
+    //              .addElements(KV.of("hello", 37))
+    //              .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3, 42);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    //    public void testSetProcessingTimerAfterClear() {
+    //      final String timerId = "processing-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(
+    //                @Element KV<String, Integer> e,
+    //                @TimerId(timerId) Timer timer,
+    //                OutputReceiver<Integer> r) {
+    //              timer.clear();
+    //              timer.offset(Duration.standardSeconds(1)).setRelative();
+    //              r.output(3);
+    //            }
+    //
+    //            @OnTimer(timerId)
+    //            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) {
+    //              r.output(42);
+    //            }
+    //          };
+    //
+    //      TestStream<KV<String, Integer>> stream =
+    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+    //              .addElements(KV.of("hello", 37), KV.of("hello", 38))
+    //              .advanceProcessingTime(
+    //                  Duration.millis(
+    //                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds
+    //                      .plus(Duration.standardMinutes(2)))
+    //              .advanceWatermarkToInfinity();
+    //
+    //      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    //      PAssert.that(output).containsInAnyOrder(3, 3, 42);
+    //      pipeline.run();
+    //    }
+    //
+    //    @Test
+    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    //    public void testSetEventTimerAfterClear() {
+    //      final String timerId = "event-timer";
+    //
+    //      DoFn<KV<String, Integer>, Integer> fn =
+    //          new DoFn<KV<String, Integer>, Integer>() {
+    //
+    //            @TimerId(timerId)
+    //            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    //
+    //            @ProcessElement
+    //            public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer>

Review comment:
       Did you mean to leave these large sections commented out?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org