You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/31 03:22:27 UTC
[incubator-nemo] branch master updated: [NEMO-230] Emit collected
data when receiving watermark in GroupByKeyAndWindowTransform (#135)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 04d6eb4 [NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform (#135)
04d6eb4 is described below
commit 04d6eb45af23d7fae3f69b2f37980a47477e0a56
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Wed Oct 31 12:22:23 2018 +0900
[NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform (#135)
JIRA: [NEMO-230: Emit collected data when receiving watermark in GroupByKeyAndWindowTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-230)
**Major changes:**
- Fix `GroupByKeyAndWindowTransform` to emit collected data whenever receiving watermarks.
**Minor changes to note:**
-
**Tests for the changes:**
- Test GroupByKeyAndWindowTransform whether it emits correct data when receiving watermarks.
**Other comments:**
-
Closes #135
---
.../GroupByKeyAndWindowDoFnTransform.java | 165 ++++++++++++++-----
.../frontend/beam/transform/DoFnTransformTest.java | 35 ----
.../GroupByKeyAndWindowDoFnTransformTest.java | 176 +++++++++++++++++++++
.../beam/transform/TestOutputCollector.java | 69 ++++++++
4 files changed, 374 insertions(+), 71 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 57475ba..7c44b76 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
private final SystemReduceFn reduceFn;
private final Map<K, List<WindowedValue<InputT>>> keyToValues;
- private transient InMemoryTimerInternalsFactory timerInternalsFactory;
+ private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
/**
* GroupByKey constructor.
@@ -76,30 +78,69 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
*/
@Override
protected DoFn wrapDoFn(final DoFn doFn) {
- timerInternalsFactory = new InMemoryTimerInternalsFactory();
+ final Map<K, StateAndTimerForKey> map = new HashMap<>();
+ this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory(map);
+ this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory(map);
+
// This function performs group by key and window operation
return
GroupAlsoByWindowViaWindowSetNewDoFn.create(
getWindowingStrategy(),
- new InMemoryStateInternalsFactory(),
- timerInternalsFactory,
+ inMemoryStateInternalsFactory,
+ inMemoryTimerInternalsFactory,
getSideInputReader(),
reduceFn,
getOutputManager(),
getMainOutputTag());
}
+ /**
+ * It collects data for each key.
+ * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
+ * @param element data element
+ */
@Override
public void onData(final WindowedValue<KV<K, InputT>> element) {
+ // We can call Beam's DoFnRunner#processElement here,
+ // but it may generate some overheads if we call the method for each data.
+ // The `processElement` requires a `Iterator` of data, so we emit the buffered data every watermark.
+ // TODO #250: But, this approach can delay the event processing in streaming,
+ // TODO #250: if the watermark is not triggered for a long time.
final KV<K, InputT> kv = element.getValue();
- keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
+ keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
}
+ /**
+ * Process the collected data and trigger timers.
+ * @param watermark current watermark
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ */
+ private void processElementsAndTriggerTimers(final Watermark watermark,
+ final Instant processingTime,
+ final Instant synchronizedTime) {
+ keyToValues.forEach((key, val) -> {
+ // for each key
+ // Process elements
+ if (!val.isEmpty()) {
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.elementsWorkItem(key, val);
+ // The DoFnRunner interface requires WindowedValue,
+ // but this windowed value is actually not used in the ReduceFnRunner internal.
+ getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ }
+
+ // Trigger timers
+ triggerTimers(key, watermark, processingTime, synchronizedTime);
+ // Remove values
+ val.clear();
+ });
+ }
+
@Override
public void onWatermark(final Watermark watermark) {
- // TODO #230: Emit collected data when receiving watermark
- // TODO #230: in GroupByKeyAndWindowTransform
+ processElementsAndTriggerTimers(watermark, Instant.now(), Instant.now());
}
/**
@@ -108,33 +149,58 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
*/
@Override
protected void beforeClose() {
- final InMemoryTimerInternals timerInternals = timerInternalsFactory.timerInternals;
- try {
- // Finish any pending windows by advancing the input watermark to infinity.
- timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ // Finish any pending windows by advancing the input watermark to infinity.
+ processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
+ BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ }
- // Finally, advance the processing time to infinity to fire any timers.
- timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
- timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ /**
+ * Trigger times for current key.
+ * When triggering, it emits the windowed data to downstream operators.
+ * @param key key
+ * @param watermark watermark
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ */
+ private void triggerTimers(final K key,
+ final Watermark watermark,
+ final Instant processingTime,
+ final Instant synchronizedTime) {
+ final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
+ inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+ try {
+ timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp()));
+ timerInternals.advanceProcessingTime(processingTime);
+ timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
} catch (final Exception e) {
throw new RuntimeException(e);
}
- if (keyToValues.isEmpty()) {
- LOG.warn("Beam GroupByKeyAndWindowDoFnTransform received no data!");
- } else {
- // timer
- final Iterable<TimerInternals.TimerData> timerData = getTimers(timerInternals);
+ final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
- keyToValues.entrySet().stream().forEach(entry -> {
- // The GroupAlsoByWindowViaWindowSetNewDoFn requires KeyedWorkItem,
- // so we convert the KV to KeyedWorkItem
- final KeyedWorkItem<K, InputT> keyedWorkItem =
- KeyedWorkItems.workItem(entry.getKey(), timerData, entry.getValue());
- getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
- });
- keyToValues.clear();
+ if (timerDataList.isEmpty()) {
+ return;
+ }
+
+ // Trigger timers and emit windowed data
+ final KeyedWorkItem<K, InputT> timerWorkItem =
+ KeyedWorkItems.timersWorkItem(key, timerDataList);
+ // The DoFnRunner interface requires WindowedValue,
+ // but this windowed value is actually not used in the ReduceFnRunner internal.
+ getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
+
+ // output watermark
+ // we set output watermark to the minimum of the timer data
+ long outputWatermark = Long.MAX_VALUE;
+ for (final TimerInternals.TimerData timer : timerDataList) {
+ if (outputWatermark > timer.getTimestamp().getMillis()) {
+ outputWatermark = timer.getTimestamp().getMillis();
+ }
}
+
+ // Emit watermark to downstream operators
+ timerInternals.advanceOutputWatermark(new Instant(outputWatermark));
+ getOutputCollector().emitWatermark(new Watermark(outputWatermark));
}
@Override
@@ -144,7 +210,10 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
return sb.toString();
}
- private Iterable<TimerInternals.TimerData> getTimers(final InMemoryTimerInternals timerInternals) {
+ /**
+ * Get timer data.
+ */
+ private List<TimerInternals.TimerData> getEligibleTimers(final InMemoryTimerInternals timerInternals) {
final List<TimerInternals.TimerData> timerData = new LinkedList<>();
while (true) {
@@ -172,18 +241,37 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
}
/**
+ * State and timer internal.
+ */
+ final class StateAndTimerForKey {
+ private StateInternals stateInternals;
+ private TimerInternals timerInternals;
+
+ StateAndTimerForKey(final StateInternals stateInternals,
+ final TimerInternals timerInternals) {
+ this.stateInternals = stateInternals;
+ this.timerInternals = timerInternals;
+ }
+ }
+
+ /**
* InMemoryStateInternalsFactory.
*/
final class InMemoryStateInternalsFactory implements StateInternalsFactory<K> {
- private final InMemoryStateInternals inMemoryStateInternals;
+ private final Map<K, StateAndTimerForKey> map;
- InMemoryStateInternalsFactory() {
- this.inMemoryStateInternals = InMemoryStateInternals.forKey(null);
+ InMemoryStateInternalsFactory(final Map<K, StateAndTimerForKey> map) {
+ this.map = map;
}
@Override
public StateInternals stateInternalsForKey(final K key) {
- return inMemoryStateInternals;
+ map.putIfAbsent(key, new StateAndTimerForKey(InMemoryStateInternals.forKey(key), null));
+ final StateAndTimerForKey stateAndTimerForKey = map.get(key);
+ if (stateAndTimerForKey.stateInternals == null) {
+ stateAndTimerForKey.stateInternals = InMemoryStateInternals.forKey(key);
+ }
+ return stateAndTimerForKey.stateInternals;
}
}
@@ -191,15 +279,20 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
* InMemoryTimerInternalsFactory.
*/
final class InMemoryTimerInternalsFactory implements TimerInternalsFactory<K> {
- private final InMemoryTimerInternals timerInternals;
+ private final Map<K, StateAndTimerForKey> map;
- InMemoryTimerInternalsFactory() {
- this.timerInternals = new InMemoryTimerInternals();
+ InMemoryTimerInternalsFactory(final Map<K, StateAndTimerForKey> map) {
+ this.map = map;
}
@Override
public TimerInternals timerInternalsForKey(final K key) {
- return timerInternals;
+ map.putIfAbsent(key, new StateAndTimerForKey(null, new InMemoryTimerInternals()));
+ final StateAndTimerForKey stateAndTimerForKey = map.get(key);
+ if (stateAndTimerForKey.timerInternals == null) {
+ stateAndTimerForKey.timerInternals = new InMemoryTimerInternals();
+ }
+ return stateAndTimerForKey.timerInternals;
}
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index 8189a48..b2fbaeb 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -194,41 +194,6 @@ public final class DoFnTransformTest {
doFnTransform.close();
}
- private static final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
- private final List<WindowedValue<T>> outputs;
- private final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
-
- TestOutputCollector() {
- this.outputs = new LinkedList<>();
- this.taggedOutputs = new LinkedList<>();
- }
-
- @Override
- public void emit(WindowedValue<T> output) {
- outputs.add(output);
- }
-
- @Override
- public void emitWatermark(final Watermark watermark) {
- // do nothing
- }
-
- @Override
- public <O> void emit(String dstVertexId, O output) {
- final WindowedValue<T> val = (WindowedValue<T>) output;
- final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
- taggedOutputs.add(tuple);
- }
-
- public List<WindowedValue<T>> getOutput() {
- return outputs;
- }
-
- public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
- return taggedOutputs;
- }
- }
-
/**
* Identitiy do fn.
* @param <T> type
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
new file mode 100644
index 0000000..b7a960f
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.reef.io.Tuple;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class GroupByKeyAndWindowDoFnTransformTest {
+
+ private final static Coder NULL_INPUT_CODER = null;
+ private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
+
+ private void checkOutput(final KV<String, List<String>> expected, final KV<String, Iterable<String>> result) {
+
+ // check key
+ assertEquals(expected.getKey(), result.getKey());
+
+ // check value
+ final List<String> resultValue = new ArrayList<>();
+ final List<String> expectedValue = new ArrayList<>(expected.getValue());
+ result.getValue().iterator().forEachRemaining(resultValue::add);
+ Collections.sort(resultValue);
+
+ assertEquals(expectedValue, resultValue);
+ }
+
+
+ // [---- window1 --------] [--------------- window2 ---------------]
+ // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7
+ // (1, "hello")
+ // (1, "world")
+ // (2, "hello")
+ // ==> window1: {(1,["hello","world"]), (2, ["hello"])}
+ // (1, "a")
+ // (2,"a")
+ // (3,"a")
+ // (2,"b")
+ // => window2: {(1,"a"), (2,["a","b"]), (3,"a")}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void test() {
+
+ final TupleTag<String> outputTag = new TupleTag<>("main-output");
+ final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+ final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
+ new GroupByKeyAndWindowDoFnTransform(
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ Collections.emptyList(), /* additional outputs */
+ WindowingStrategy.of(fixedwindows),
+ emptyList(), /* side inputs */
+ PipelineOptionsFactory.as(NemoPipelineOptions.class),
+ SystemReduceFn.buffering(NULL_INPUT_CODER));
+
+ final Instant ts1 = new Instant(1);
+ final Instant ts2 = new Instant(100);
+ final Instant ts3 = new Instant(300);
+ final Watermark watermark = new Watermark(1003);
+ final Instant ts4 = new Instant(1200);
+ final Watermark watermark2 = new Watermark(1400);
+ final Instant ts5 = new Instant(1600);
+ final Instant ts6 = new Instant(1800);
+ final Instant ts7 = new Instant(1900);
+ final Watermark watermark3 = new Watermark(2100);
+
+
+ final Transform.Context context = mock(Transform.Context.class);
+ final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+ doFnTransform.prepare(context, oc);
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING));
+
+ doFnTransform.onWatermark(watermark);
+
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ // windowed result for key 1
+ assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows());
+ checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
+
+ // windowed result for key 2
+ assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), oc.outputs.get(1).getWindows());
+ checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+ // check output watermark
+ assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+ oc.watermarks.get(0).getTimestamp());
+
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING));
+
+ // do not emit anything
+ doFnTransform.onWatermark(watermark2);
+ assertEquals(0, oc.outputs.size());
+ assertEquals(0, oc.watermarks.size());
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING));
+
+ // emit windowed value
+ doFnTransform.onWatermark(watermark3);
+
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ // windowed result for key 1
+ assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows());
+ checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue());
+
+ // windowed result for key 2
+ assertEquals(Arrays.asList(fixedwindows.assignWindow(ts5)), oc.outputs.get(1).getWindows());
+ checkOutput(KV.of("2", Arrays.asList("a", "b")), oc.outputs.get(1).getValue());
+
+ // windowed result for key 3
+ assertEquals(Arrays.asList(fixedwindows.assignWindow(ts6)), oc.outputs.get(2).getWindows());
+ checkOutput(KV.of("3", Arrays.asList("a")), oc.outputs.get(2).getValue());
+
+ // check output watermark
+ assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+ oc.watermarks.get(0).getTimestamp());
+
+ doFnTransform.close();
+ }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
new file mode 100644
index 0000000..9abff71
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.reef.io.Tuple;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Test output collector that collects data and watermarks.
+ * @param <T>
+ */
+final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
+ public final List<WindowedValue<T>> outputs;
+ public final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
+ public final List<Watermark> watermarks;
+
+ TestOutputCollector() {
+ this.outputs = new LinkedList<>();
+ this.taggedOutputs = new LinkedList<>();
+ this.watermarks = new LinkedList<>();
+ }
+
+ @Override
+ public void emit(WindowedValue<T> output) {
+ outputs.add(output);
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ watermarks.add(watermark);
+ }
+
+
+ @Override
+ public <O> void emit(String dstVertexId, O output) {
+ final WindowedValue<T> val = (WindowedValue<T>) output;
+ final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
+ taggedOutputs.add(tuple);
+ }
+
+ public List<WindowedValue<T>> getOutput() {
+ return outputs;
+ }
+
+ public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
+ return taggedOutputs;
+ }
+}