You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/07 05:41:28 UTC
[incubator-nemo] branch master updated: [NEMO-267] Consider
watermark holds in GroupByKeyAndWindowDoFnTransform (#151)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new b1dad52 [NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform (#151)
b1dad52 is described below
commit b1dad52ca965a6b67bb13d43f5a676eb2ad884d5
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Wed Nov 7 14:41:23 2018 +0900
[NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform (#151)
JIRA: [NEMO-267: Consider watermark holds in GroupByKeyAndWindowDoFnTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-267)
**Major changes:**
- Fix `GroupByKeyAndWindowDoFnTransform` to emit output watermarks by holding watermarks for each key
**Tests for the changes:**
- Fix `GroupByKeyAndWindowDoFnTransform` by using sliding windows
---
.../apache/nemo/common/punctuation/Watermark.java | 12 ++-
.../beam/transform/AbstractDoFnTransform.java | 9 +-
.../frontend/beam/transform/DoFnTransform.java | 6 ++
.../GroupByKeyAndWindowDoFnTransform.java | 103 ++++++++++++++----
.../GroupByKeyAndWindowDoFnTransformTest.java | 118 ++++++++++++++++-----
5 files changed, 200 insertions(+), 48 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index 27ee53b..e676e6e 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -24,7 +24,7 @@ import java.util.Objects;
/**
* Watermark event.
*/
-public final class Watermark implements Serializable {
+public final class Watermark implements Serializable, Comparable<Watermark> {
private final long timestamp;
public Watermark(final long timestamp) {
@@ -48,7 +48,17 @@ public final class Watermark implements Serializable {
}
@Override
+ public String toString() {
+ return String.valueOf(timestamp);
+ }
+
+ @Override
public int hashCode() {
return Objects.hash(timestamp);
}
+
+ @Override
+ public int compareTo(final Watermark o) {
+ return Long.compare(timestamp, o.getTimestamp());
+ }
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 3585ea2..6a8f8d4 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -159,7 +159,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
// deserialize pipeline option
final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
- this.outputCollector = oc;
+ this.outputCollector = wrapOutputCollector(oc);
this.bundleMillis = options.getMaxBundleTimeMills();
this.bundleSize = options.getMaxBundleSize();
@@ -227,6 +227,13 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
*/
abstract DoFn wrapDoFn(final DoFn originalDoFn);
+ /**
+ * An abstract function that wraps the original output collector.
+ * @param oc the original outputCollector.
+ * @return wrapped output collector.
+ */
+ abstract OutputCollector wrapOutputCollector(final OutputCollector oc);
+
@Override
public abstract void onData(final WindowedValue<InputT> data);
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 433f9df..18368c6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
import java.util.Collection;
@@ -84,6 +85,11 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
}
@Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ return oc;
+ }
+
+ @Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("DoTransform:" + getDoFn());
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 3727846..84b6835 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -48,7 +49,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
private final Map<K, List<WindowedValue<InputT>>> keyToValues;
private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
- private long currentOutputWatermark;
+ private Watermark prevOutputWatermark;
+ private final Map<K, Watermark> keyAndWatermarkHoldMap;
/**
* GroupByKey constructor.
@@ -70,7 +72,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
options);
this.keyToValues = new HashMap<>();
this.reduceFn = reduceFn;
- this.currentOutputWatermark = Long.MIN_VALUE;
+ this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+ this.keyAndWatermarkHoldMap = new HashMap<>();
}
/**
@@ -96,6 +99,11 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
getMainOutputTag());
}
+ @Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ return new GBKWOutputCollector(oc);
+ }
+
/**
* It collects data for each key.
* The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
@@ -126,8 +134,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
private void processElementsAndTriggerTimers(final Watermark inputWatermark,
final Instant processingTime,
final Instant synchronizedTime) {
- long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE;
-
for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
final K key = entry.getKey();
final List<WindowedValue<InputT>> values = entry.getValue();
@@ -143,20 +149,47 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
}
// Trigger timers
- final long minOutputTimestamp =
- triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
-
- minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+ triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
// Remove values
values.clear();
}
+ }
- // Emit watermark to downstream operators
- if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
- && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
- currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
- getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows));
+ /**
+ * Output watermark
+ * = max(prev output watermark,
+ * min(input watermark, watermark holds)).
+ * @param inputWatermark input watermark
+ */
+ private void emitOutputWatermark(final Watermark inputWatermark) {
+
+ if (keyAndWatermarkHoldMap.isEmpty()) {
+ return;
+ }
+
+ // Find min watermark hold
+ final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Watermark hold: {}, "
+ + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
+ }
+
+ final Watermark outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
+
+ if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
+ // progress!
+ prevOutputWatermark = outputWatermarkCandidate;
+ // emit watermark
+ getOutputCollector().emitWatermark(outputWatermarkCandidate);
+ // Remove minimum watermark holds
+ if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
+ keyAndWatermarkHoldMap.entrySet()
+ .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp());
+ }
}
}
@@ -164,6 +197,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
public void onWatermark(final Watermark inputWatermark) {
checkAndInvokeBundle();
processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
+ // Emit watermark to downstream operators
+ emitOutputWatermark(inputWatermark);
checkAndFinishBundle();
}
@@ -176,6 +211,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
// Finish any pending windows by advancing the input watermark to infinity.
processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ // Emit watermark to downstream operators
+ emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
}
/**
@@ -185,10 +222,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
* @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
- * @return the minimum output timestamp.
- * If no data is emitted, it returns Long.MAX_VALUE.
*/
- private long triggerTimers(final K key,
+ private void triggerTimers(final K key,
final Watermark watermark,
final Instant processingTime,
final Instant synchronizedTime) {
@@ -204,10 +239,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
- if (timerDataList.isEmpty()) {
- return Long.MAX_VALUE;
- } else {
-
+ if (!timerDataList.isEmpty()) {
// Trigger timers and emit windowed data
final KeyedWorkItem<K, InputT> timerWorkItem =
KeyedWorkItems.timersWorkItem(key, timerDataList);
@@ -223,8 +255,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
}
timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
-
- return keyOutputTimestamp;
}
}
@@ -320,4 +350,33 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
return stateAndTimerForKey.timerInternals;
}
}
+
+ /**
+ * This class wraps the output collector to track the watermark hold of each key.
+ */
+ final class GBKWOutputCollector implements OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> {
+ private final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector;
+ GBKWOutputCollector(final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector) {
+ this.outputCollector = outputCollector;
+ }
+
+ @Override
+ public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
+ // adds the output timestamp to the watermark hold of each key
+ // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
+ // TODO #270: consider early firing
+ // TODO #270: This logic may not be applied to early firing outputs
+ keyAndWatermarkHoldMap.put(output.getValue().getKey(),
+ new Watermark(output.getTimestamp().getMillis() + 1));
+ outputCollector.emit(output);
+ }
+ @Override
+ public void emitWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+ @Override
+ public <T> void emit(final String dstVertexId, final T output) {
+ outputCollector.emit(dstVertexId, output);
+ }
+ }
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index b7a960f..e3fa23e 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -21,8 +21,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
@@ -42,6 +41,7 @@ import static java.util.Collections.emptyList;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
+// TODO #270: Test different triggers
public final class GroupByKeyAndWindowDoFnTransformTest {
private final static Coder NULL_INPUT_CODER = null;
@@ -57,13 +57,19 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
final List<String> expectedValue = new ArrayList<>(expected.getValue());
result.getValue().iterator().forEachRemaining(resultValue::add);
Collections.sort(resultValue);
+ Collections.sort(expectedValue);
assertEquals(expectedValue, resultValue);
}
- // [---- window1 --------] [--------------- window2 ---------------]
- // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7
+ // window size: 2 sec
+ // interval size: 1 sec
+ //
+ // [--------------window2------------------------------]
+ // [----------------------- window1 --------------------------]
+ // [-------window0-------]
+ // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4
// (1, "hello")
// (1, "world")
// (2, "hello")
@@ -78,13 +84,15 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
public void test() {
final TupleTag<String> outputTag = new TupleTag<>("main-output");
- final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+ final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2))
+ .every(Duration.standardSeconds(1));
+
final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
new GroupByKeyAndWindowDoFnTransform(
NULL_OUTPUT_CODERS,
outputTag,
Collections.emptyList(), /* additional outputs */
- WindowingStrategy.of(fixedwindows),
+ WindowingStrategy.of(slidingWindows),
emptyList(), /* side inputs */
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.buffering(NULL_INPUT_CODER));
@@ -99,6 +107,25 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
final Instant ts6 = new Instant(1800);
final Instant ts7 = new Instant(1900);
final Watermark watermark3 = new Watermark(2100);
+ final Instant ts8 = new Instant(2200);
+ final Instant ts9 = new Instant(2300);
+ final Watermark watermark4 = new Watermark(3000);
+
+
+ List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+ Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+ // [0---1000)
+ final IntervalWindow window0 = sortedWindows.get(0);
+ // [0---2000)
+ final IntervalWindow window1 = sortedWindows.get(1);
+
+ sortedWindows.clear();
+ sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+ Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+ // [1000--3000)
+ final IntervalWindow window2 = sortedWindows.get(1);
final Transform.Context context = mock(Transform.Context.class);
@@ -106,35 +133,41 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
doFnTransform.prepare(context, oc);
doFnTransform.onData(WindowedValue.of(
- KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING));
+ KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
- KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING));
+ KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
- KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING));
+ KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
doFnTransform.onWatermark(watermark);
+ // output
+ // 1: ["hello", "world"]
+ // 2: ["hello"]
Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
// windowed result for key 1
- assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows());
+ assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
// windowed result for key 2
- assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), oc.outputs.get(1).getWindows());
+ assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+ assertEquals(2, oc.outputs.size());
+ assertEquals(1, oc.watermarks.size());
+
// check output watermark
- assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+ assertEquals(1000,
oc.watermarks.get(0).getTimestamp());
oc.outputs.clear();
oc.watermarks.clear();
doFnTransform.onData(WindowedValue.of(
- KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING));
+ KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
// do not emit anything
doFnTransform.onWatermark(watermark2);
@@ -142,33 +175,70 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
assertEquals(0, oc.watermarks.size());
doFnTransform.onData(WindowedValue.of(
- KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING));
+ KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
- KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING));
+ KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
- KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING));
+ KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
- // emit windowed value
+ // emit window1
doFnTransform.onWatermark(watermark3);
+ // output
+ // 1: ["hello", "world", "a"]
+ // 2: ["hello"]
+ // 3: ["a", "a", "b"]
Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
// windowed result for key 1
- assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows());
- checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue());
+ assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+ checkOutput(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue());
// windowed result for key 2
- assertEquals(Arrays.asList(fixedwindows.assignWindow(ts5)), oc.outputs.get(1).getWindows());
- checkOutput(KV.of("2", Arrays.asList("a", "b")), oc.outputs.get(1).getValue());
+ assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows());
+ checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+ // windowed result for key 3
+ assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows());
+ checkOutput(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue());
+
+ // check output watermark
+ assertEquals(2000,
+ oc.watermarks.get(0).getTimestamp());
+
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
+
+ // emit window2
+ doFnTransform.onWatermark(watermark4);
+
+ // output
+ // 1: ["a", "a"]
+ // 3: ["a", "a", "b", "b"]
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ assertEquals(2, oc.outputs.size());
+
+ // windowed result for key 1
+ assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
+ checkOutput(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue());
// windowed result for key 3
- assertEquals(Arrays.asList(fixedwindows.assignWindow(ts6)), oc.outputs.get(2).getWindows());
- checkOutput(KV.of("3", Arrays.asList("a")), oc.outputs.get(2).getValue());
+ assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows());
+ checkOutput(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue());
// check output watermark
- assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+ assertEquals(3000,
oc.watermarks.get(0).getTimestamp());
doFnTransform.close();