You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/02 07:24:00 UTC

[jira] [Work logged] (BEAM-5265) Can not test Timer with processing time domain

     [ https://issues.apache.org/jira/browse/BEAM-5265?focusedWorklogId=161888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-161888 ]

ASF GitHub Bot logged work on BEAM-5265:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Nov/18 07:23
            Start Date: 02/Nov/18 07:23
    Worklog Time Spent: 10m 
      Work Description: JozoVilcek commented on a change in pull request #6305: [BEAM-5265] Use currentProcessingTime() for onTime with processing time domain
URL: https://github.com/apache/beam/pull/6305#discussion_r230287609
 
 

 ##########
 File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 ##########
 @@ -198,44 +218,185 @@ public void testFinishBundleExceptionsWrappedAsUserCodeException() {
   }
 
   /**
-   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}.
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}
+   * on appropriate time domains.
    */
   @Test
-  public void testOnTimerCalled() {
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithGlobalWindow() {
+
+    // TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock
+    Instant currentProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    Instant currentEventTime = new Instant(42);
+
+    TestStream<KV<String, String>> testStream =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(currentEventTime)
+            .addElements(TimestampedValue.of(KV.of("anyKey", "anyValue"), new Instant(99)))
+            .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+            .advanceWatermarkToInfinity();
+
     WindowFn<?, GlobalWindow> windowFn = new GlobalWindows();
-    DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder());
-    DoFnRunner<String, String> runner =
-        new SimpleDoFnRunner<>(
-            null,
-            fn,
-            NullSideInputReader.empty(),
-            null,
-            null,
-            Collections.emptyList(),
-            mockStepContext,
-            null,
-            Collections.emptyMap(),
-            WindowingStrategy.of(windowFn));
+    DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers<>(windowFn.windowCoder());
 
-    Instant currentTime = new Instant(42);
-    Duration offset = Duration.millis(37);
+    PCollection<TimerData> output =
+        pipeline
+            .apply(testStream)
+            .apply(Window.into(new GlobalWindows()))
+            .apply(ParDo.of(fn))
+            .setCoder(TimerInternals.TimerDataCoder.of(windowFn.windowCoder()));
 
-    // Mocking is not easily compatible with annotation analysis, so we manually record
-    // the method call.
-    runner.onTimer(
-        DoFnWithTimers.TIMER_ID,
-        GlobalWindow.INSTANCE,
-        currentTime.plus(offset),
-        TimeDomain.EVENT_TIME);
-
-    assertThat(
-        fn.onTimerInvocations,
-        contains(
+    PAssert.that(output)
+        .containsInAnyOrder(
             TimerData.of(
-                DoFnWithTimers.TIMER_ID,
+                DoFnWithTimers.PROCESSING_TIMER_ID,
                 StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
-                currentTime.plus(offset),
-                TimeDomain.EVENT_TIME)));
+                currentProcessingTime.plus(DoFnWithTimers.TIMER_OFFSET).plus(1),
+                TimeDomain.PROCESSING_TIME),
+            TimerData.of(
+                DoFnWithTimers.EVENT_TIMER_ID,
+                StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
+                currentEventTime.plus(DoFnWithTimers.TIMER_OFFSET),
+                TimeDomain.EVENT_TIME));
+
+    pipeline.run();
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}
+   * on appropriate time domains. With {@link IntervalWindow}, we check behavior of emitted events
+   * when time is inside and outside of window boundaries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOnTimerCalledWithIntervalWindow() {
+
+    // TIMESTAMP_MIN_VALUE is initial value for processing time used done by TestClock
+    Instant baseTime = new Instant(0);
+
+    Duration windowDuration = Duration.standardHours(1);
+    Duration windowLateness = Duration.standardMinutes(1);
+    IntervalWindow window = new IntervalWindow(baseTime, windowDuration);
+    FixedWindows windowFn = FixedWindows.of(windowDuration);
+    DoFnWithTimers<IntervalWindow> fn = new DoFnWithTimers<>(windowFn.windowCoder());
+
+    TimestampedValue<KV<String, String>> event =
+        TimestampedValue.of(KV.of("anyKey", "anyValue"), window.start());
+
+    TestStream<KV<String, String>> testStream =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+            // watermark in window, processing time far behind
+            .advanceWatermarkTo(window.start())
+            .addElements(event)
+            .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+            .advanceWatermarkTo(window.start().plus(DoFnWithTimers.TIMER_OFFSET).plus(1))
+
+            // watermark and processing time within window
+            .advanceProcessingTime(
+                Duration.millis(Math.abs(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())))
+            .addElements(event)
+            .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+            .advanceWatermarkTo(
+                window.start().plus(2 * DoFnWithTimers.TIMER_OFFSET.getMillis()).plus(2))
+
+            // watermark in window, processing time ahead of window.end() but within lateness
+            .advanceProcessingTime(windowDuration)
+            .addElements(event)
+            .advanceProcessingTime(DoFnWithTimers.TIMER_OFFSET.plus(1))
+            .advanceWatermarkTo(
+                window.start().plus(3 * DoFnWithTimers.TIMER_OFFSET.getMillis()).plus(3))
+
+            // watermark in window, processing time  is out of window's allowed lateness
 
 Review comment:
   Seems like `SimpleDoFnRunner` only materialize windowed element with whatever context it has and forwards it to `OutputManager`. 
   https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L309
   It is it who should do the dropping? Or needs to do it in SimpleDoFnRunner?
   It looks like there are quite a few OutputManager implementations, so I guess it is not it. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 161888)
    Time Spent: 3h 20m  (was: 3h 10m)

> Can not test Timer with processing time domain
> ----------------------------------------------
>
>                 Key: BEAM-5265
>                 URL: https://issues.apache.org/jira/browse/BEAM-5265
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, runner-direct
>            Reporter: Jozef Vilcek
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While writing tests, I noticed that it does not react to `advanceProcessingTime()` on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)