You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/12 07:05:23 UTC

[incubator-nemo] branch master updated: [NEMO-279] Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new c5ef7ed  [NEMO-279] Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156)
c5ef7ed is described below

commit c5ef7ed5874d3ea6e662dde7a917186a8b80a76b
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Mon Nov 12 16:05:17 2018 +0900

    [NEMO-279] Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156)
    
    JIRA: [NEMO-279: Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-279)
    
    **Major changes:**
    - Set `isFinished` true in `BeamUnboundedSourceVertex` when emitting `TIMESTAMP_MAX_VALUE` in watermark.
---
 .../frontend/beam/source/BeamUnboundedSourceVertex.java        | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index 482dd9d..ad40d1b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -20,10 +20,12 @@ package org.apache.nemo.compiler.frontend.beam.source;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,6 +103,7 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
     private UnboundedSource.UnboundedReader<O> reader;
     private boolean isStarted = false;
     private boolean isCurrentAvailable = false;
+    private boolean isFinished = false;
 
     UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
       this.unboundedSource = unboundedSource;
@@ -138,12 +141,15 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
 
     @Override
     public long readWatermark() {
-      return reader.getWatermark().getMillis();
+      final Instant watermark = reader.getWatermark();
+      // Finish if the watermark == TIMESTAMP_MAX_VALUE
+      isFinished = (watermark.getMillis() >= GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
+      return watermark.getMillis();
     }
 
     @Override
     public boolean isFinished() {
-      return false;
+      return isFinished;
     }
 
     @Override