You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/12/04 06:24:33 UTC

[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238537837
 
 

 ##########
 File path: compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 ##########
 @@ -248,4 +252,120 @@ public void test() {
 
     doFnTransform.close();
   }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void eventTimeTriggerTest() {
+    final Duration lateness = Duration.standardSeconds(1);
+    final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow()
+      // early firing
+      .withEarlyFirings(
+        AfterProcessingTime
+          .pastFirstElementInPane()
+          // early firing 1 sec after receiving an element
+          .plusDelayOf(Duration.millis(1000)))
+      // late firing: Fire on any late data.
+      .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+    final FixedWindows window = (FixedWindows) Window.into(
+      FixedWindows.of(Duration.standardSeconds(5)))
+      // lateness
+      .withAllowedLateness(lateness)
+      .triggering(trigger)
+      .accumulatingFiredPanes().getWindowFn();
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
+      new GroupByKeyAndWindowDoFnTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        WindowingStrategy.of(window).withTrigger(trigger)
+          .withMode(ACCUMULATING_FIRED_PANES)
+        .withAllowedLateness(lateness),
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.buffering(NULL_INPUT_CODER),
+        DisplayData.none());
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING));
+
+    // early firing is not related to the watermark progress
+    doFnTransform.onWatermark(new Watermark(2));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    oc.outputs.clear();
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
+    // EARLY firing... waiting >= 1 sec
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    doFnTransform.onWatermark(new Watermark(5));
 
 Review comment:
   I suppose this effectively advances processing time?
   Can you add a comment about that?
   Maybe also check that `oc.outputs` is empty prior to this line?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services