You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/06/11 03:19:37 UTC

[incubator-nemo] branch master updated: [NEMO-377] Fix watermark emission when there are no outputs in GBKWindowTransform (#210)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new c146054  [NEMO-377] Fix watermark emission when there are no outputs in GBKWindowTransform (#210)
c146054 is described below

commit c1460546ab6eef271e54781a54a858bc94bdb39f
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Tue Jun 11 12:19:32 2019 +0900

    [NEMO-377] Fix watermark emission when there are no outputs in GBKWindowTransform (#210)
    
    JIRA: [NEMO-377: Fix watermark emission when there are no outputs in GBKWindowTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-377)
    
    **Major changes:**
    - Set min value when there are no outputs in GBKWindow
---
 .../frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java | 5 ++++-
 .../beam/transform/GroupByKeyAndWindowDoFnTransformTest.java      | 8 +-------
 2 files changed, 5 insertions(+), 8 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index cf75cb4..a7c815f 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -53,6 +53,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
   private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
   private Watermark prevOutputWatermark;
   private final Map<K, Watermark> keyAndWatermarkHoldMap;
+  private boolean dataReceived = false;
 
   /**
    * GroupByKey constructor.
@@ -123,6 +124,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
   @Override
   public void onData(final WindowedValue<KV<K, InputT>> element) {
     checkAndInvokeBundle();
+    dataReceived = true;
 
     // We can call Beam's DoFnRunner#processElement here,
     // but it may generate some overheads if we call the method for each data.
@@ -178,7 +180,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
   private void emitOutputWatermark(final Watermark inputWatermark) {
     // Find min watermark hold
     final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
-      ? new Watermark(Long.MAX_VALUE) // set this to MAX, in order to just use the input watermark.
+      ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+      // set this to MAX, in order not to emit input watermark when there are no outputs.
       : Collections.min(keyAndWatermarkHoldMap.values());
     final Watermark outputWatermarkCandidate = new Watermark(
       Math.max(prevOutputWatermark.getTimestamp(),
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index 4e51525..1af392c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -173,13 +173,7 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
     doFnTransform.onWatermark(watermark2);
 
     assertEquals(0, oc.outputs.size()); // do not emit anything
-    assertEquals(1, oc.watermarks.size());
-
-    // check output watermark
-    assertEquals(1400,
-      oc.watermarks.get(0).getTimestamp());
-
-    oc.watermarks.clear();
+    assertEquals(0, oc.watermarks.size());
 
     doFnTransform.onData(WindowedValue.of(
       KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));