You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/12 07:05:23 UTC
[incubator-nemo] branch master updated: [NEMO-279] Finish
BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new c5ef7ed [NEMO-279] Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156)
c5ef7ed is described below
commit c5ef7ed5874d3ea6e662dde7a917186a8b80a76b
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Mon Nov 12 16:05:17 2018 +0900
[NEMO-279] Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156)
JIRA: [NEMO-279: Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-279)
**Major changes:**
- Set `isFinished` true in `BeamUnboundedSourceVertex` when emitting `TIMESTAMP_MAX_VALUE` in watermark.
---
.../frontend/beam/source/BeamUnboundedSourceVertex.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index 482dd9d..ad40d1b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -20,10 +20,12 @@ package org.apache.nemo.compiler.frontend.beam.source;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,6 +103,7 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
private UnboundedSource.UnboundedReader<O> reader;
private boolean isStarted = false;
private boolean isCurrentAvailable = false;
+ private boolean isFinished = false;
UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
this.unboundedSource = unboundedSource;
@@ -138,12 +141,15 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
@Override
public long readWatermark() {
- return reader.getWatermark().getMillis();
+ final Instant watermark = reader.getWatermark();
+ // Finish if the watermark == TIMESTAMP_MAX_VALUE
+ isFinished = (watermark.getMillis() >= GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ return watermark.getMillis();
}
@Override
public boolean isFinished() {
- return false;
+ return isFinished;
}
@Override