You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/06/11 03:19:37 UTC
[incubator-nemo] branch master updated: [NEMO-377] Fix watermark
emission when there are no outputs in GBKWindowTransform (#210)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new c146054 [NEMO-377] Fix watermark emission when there are no outputs in GBKWindowTransform (#210)
c146054 is described below
commit c1460546ab6eef271e54781a54a858bc94bdb39f
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Tue Jun 11 12:19:32 2019 +0900
[NEMO-377] Fix watermark emission when there are no outputs in GBKWindowTransform (#210)
JIRA: [NEMO-377: Fix watermark emission when there are no outputs in GBKWindowTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-377)
**Major changes:**
- Set min value when there are no outputs in GBKWindow
---
.../frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java | 5 ++++-
.../beam/transform/GroupByKeyAndWindowDoFnTransformTest.java | 8 +-------
2 files changed, 5 insertions(+), 8 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index cf75cb4..a7c815f 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -53,6 +53,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
private Watermark prevOutputWatermark;
private final Map<K, Watermark> keyAndWatermarkHoldMap;
+ private boolean dataReceived = false;
/**
* GroupByKey constructor.
@@ -123,6 +124,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
@Override
public void onData(final WindowedValue<KV<K, InputT>> element) {
checkAndInvokeBundle();
+ dataReceived = true;
// We can call Beam's DoFnRunner#processElement here,
// but it may generate some overheads if we call the method for each data.
@@ -178,7 +180,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
private void emitOutputWatermark(final Watermark inputWatermark) {
// Find min watermark hold
final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
- ? new Watermark(Long.MAX_VALUE) // set this to MAX, in order to just use the input watermark.
+ ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+ // set this to MAX, in order not to emit input watermark when there are no outputs.
: Collections.min(keyAndWatermarkHoldMap.values());
final Watermark outputWatermarkCandidate = new Watermark(
Math.max(prevOutputWatermark.getTimestamp(),
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index 4e51525..1af392c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -173,13 +173,7 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
doFnTransform.onWatermark(watermark2);
assertEquals(0, oc.outputs.size()); // do not emit anything
- assertEquals(1, oc.watermarks.size());
-
- // check output watermark
- assertEquals(1400,
- oc.watermarks.get(0).getTimestamp());
-
- oc.watermarks.clear();
+ assertEquals(0, oc.watermarks.size());
doFnTransform.onData(WindowedValue.of(
KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));