You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/31 03:22:27 UTC

[incubator-nemo] branch master updated: [NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform (#135)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 04d6eb4  [NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform (#135)
04d6eb4 is described below

commit 04d6eb45af23d7fae3f69b2f37980a47477e0a56
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Wed Oct 31 12:22:23 2018 +0900

    [NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform (#135)
    
    JIRA: [NEMO-230: Emit collected data when receiving watermark in GroupByKeyAndWindowTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-230)
    
    **Major changes:**
    - Fix `GroupByKeyAndWindowTransform` to emit collected data whenever receiving watermarks.
    
    **Minor changes to note:**
    -
    
    **Tests for the changes:**
    - Test GroupByKeyAndWindowTransform whether it emits correct data when receiving watermarks.
    
    **Other comments:**
    -
    
    Closes #135
---
 .../GroupByKeyAndWindowDoFnTransform.java          | 165 ++++++++++++++-----
 .../frontend/beam/transform/DoFnTransformTest.java |  35 ----
 .../GroupByKeyAndWindowDoFnTransformTest.java      | 176 +++++++++++++++++++++
 .../beam/transform/TestOutputCollector.java        |  69 ++++++++
 4 files changed, 374 insertions(+), 71 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 57475ba..7c44b76 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +46,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
 
   private final SystemReduceFn reduceFn;
   private final Map<K, List<WindowedValue<InputT>>> keyToValues;
-  private transient InMemoryTimerInternalsFactory timerInternalsFactory;
+  private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
+  private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
 
   /**
    * GroupByKey constructor.
@@ -76,30 +78,69 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
    */
   @Override
   protected DoFn wrapDoFn(final DoFn doFn) {
-    timerInternalsFactory = new InMemoryTimerInternalsFactory();
+    final Map<K, StateAndTimerForKey> map = new HashMap<>();
+    this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory(map);
+    this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory(map);
+
     // This function performs group by key and window operation
     return
       GroupAlsoByWindowViaWindowSetNewDoFn.create(
         getWindowingStrategy(),
-        new InMemoryStateInternalsFactory(),
-        timerInternalsFactory,
+        inMemoryStateInternalsFactory,
+        inMemoryTimerInternalsFactory,
         getSideInputReader(),
         reduceFn,
         getOutputManager(),
         getMainOutputTag());
   }
 
+  /**
+   * It collects data for each key.
+   * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
+   * @param element data element
+   */
   @Override
   public void onData(final WindowedValue<KV<K, InputT>> element) {
+    // We can call Beam's DoFnRunner#processElement here,
+    // but it may generate some overheads if we call the method for each data.
+    // The `processElement` requires a `Iterator` of data, so we emit the buffered data every watermark.
+    // TODO #250: But, this approach can delay the event processing in streaming,
+    // TODO #250: if the watermark is not triggered for a long time.
     final KV<K, InputT> kv = element.getValue();
-    keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
+    keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
     keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
   }
 
+  /**
+   * Process the collected data and trigger timers.
+   * @param watermark current watermark
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   */
+  private void processElementsAndTriggerTimers(final Watermark watermark,
+                                               final Instant processingTime,
+                                               final Instant synchronizedTime) {
+    keyToValues.forEach((key, val) -> {
+      // for each key
+      // Process elements
+      if (!val.isEmpty()) {
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.elementsWorkItem(key, val);
+        // The DoFnRunner interface requires WindowedValue,
+        // but this windowed value is actually not used in the ReduceFnRunner internal.
+        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+      }
+
+      // Trigger timers
+      triggerTimers(key, watermark, processingTime, synchronizedTime);
+      // Remove values
+      val.clear();
+    });
+  }
+
   @Override
   public void onWatermark(final Watermark watermark) {
-    // TODO #230: Emit collected data when receiving watermark
-    // TODO #230: in GroupByKeyAndWindowTransform
+    processElementsAndTriggerTimers(watermark, Instant.now(), Instant.now());
   }
 
   /**
@@ -108,33 +149,58 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
    */
   @Override
   protected void beforeClose() {
-    final InMemoryTimerInternals timerInternals = timerInternalsFactory.timerInternals;
-    try {
-      // Finish any pending windows by advancing the input watermark to infinity.
-      timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    // Finish any pending windows by advancing the input watermark to infinity.
+    processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
+      BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
 
-      // Finally, advance the processing time to infinity to fire any timers.
-      timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-      timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  /**
+   * Trigger times for current key.
+   * When triggering, it emits the windowed data to downstream operators.
+   * @param key key
+   * @param watermark watermark
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   */
+  private void triggerTimers(final K key,
+                             final Watermark watermark,
+                             final Instant processingTime,
+                             final Instant synchronizedTime) {
+    final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
+      inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+    try {
+      timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp()));
+      timerInternals.advanceProcessingTime(processingTime);
+      timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
 
-    if (keyToValues.isEmpty()) {
-      LOG.warn("Beam GroupByKeyAndWindowDoFnTransform received no data!");
-    } else {
-      // timer
-      final Iterable<TimerInternals.TimerData> timerData = getTimers(timerInternals);
+    final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
 
-      keyToValues.entrySet().stream().forEach(entry -> {
-        // The GroupAlsoByWindowViaWindowSetNewDoFn requires KeyedWorkItem,
-        // so we convert the KV to KeyedWorkItem
-        final KeyedWorkItem<K, InputT> keyedWorkItem =
-          KeyedWorkItems.workItem(entry.getKey(), timerData, entry.getValue());
-        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
-      });
-      keyToValues.clear();
+    if (timerDataList.isEmpty()) {
+      return;
+    }
+
+    // Trigger timers and emit windowed data
+    final KeyedWorkItem<K, InputT> timerWorkItem =
+      KeyedWorkItems.timersWorkItem(key, timerDataList);
+    // The DoFnRunner interface requires WindowedValue,
+    // but this windowed value is actually not used in the ReduceFnRunner internal.
+    getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
+
+    // output watermark
+    // we set output watermark to the minimum of the timer data
+    long outputWatermark = Long.MAX_VALUE;
+    for (final TimerInternals.TimerData timer : timerDataList) {
+      if (outputWatermark > timer.getTimestamp().getMillis()) {
+        outputWatermark = timer.getTimestamp().getMillis();
+      }
     }
+
+    // Emit watermark to downstream operators
+    timerInternals.advanceOutputWatermark(new Instant(outputWatermark));
+    getOutputCollector().emitWatermark(new Watermark(outputWatermark));
   }
 
   @Override
@@ -144,7 +210,10 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
     return sb.toString();
   }
 
-  private Iterable<TimerInternals.TimerData> getTimers(final InMemoryTimerInternals timerInternals) {
+  /**
+   * Get timer data.
+   */
+  private List<TimerInternals.TimerData> getEligibleTimers(final InMemoryTimerInternals timerInternals) {
     final List<TimerInternals.TimerData> timerData = new LinkedList<>();
 
     while (true) {
@@ -172,18 +241,37 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
   }
 
   /**
+   * State and timer internal.
+   */
+  final class StateAndTimerForKey {
+    private StateInternals stateInternals;
+    private TimerInternals timerInternals;
+
+    StateAndTimerForKey(final StateInternals stateInternals,
+                        final TimerInternals timerInternals) {
+      this.stateInternals = stateInternals;
+      this.timerInternals = timerInternals;
+    }
+  }
+
+  /**
    * InMemoryStateInternalsFactory.
    */
   final class InMemoryStateInternalsFactory implements StateInternalsFactory<K> {
-    private final InMemoryStateInternals inMemoryStateInternals;
+    private final Map<K, StateAndTimerForKey> map;
 
-    InMemoryStateInternalsFactory() {
-      this.inMemoryStateInternals = InMemoryStateInternals.forKey(null);
+    InMemoryStateInternalsFactory(final Map<K, StateAndTimerForKey> map) {
+      this.map = map;
     }
 
     @Override
     public StateInternals stateInternalsForKey(final K key) {
-      return inMemoryStateInternals;
+      map.putIfAbsent(key, new StateAndTimerForKey(InMemoryStateInternals.forKey(key), null));
+      final StateAndTimerForKey stateAndTimerForKey = map.get(key);
+      if (stateAndTimerForKey.stateInternals == null) {
+        stateAndTimerForKey.stateInternals = InMemoryStateInternals.forKey(key);
+      }
+      return stateAndTimerForKey.stateInternals;
     }
   }
 
@@ -191,15 +279,20 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
    * InMemoryTimerInternalsFactory.
    */
   final class InMemoryTimerInternalsFactory implements TimerInternalsFactory<K> {
-    private final InMemoryTimerInternals timerInternals;
+    private final Map<K, StateAndTimerForKey> map;
 
-    InMemoryTimerInternalsFactory() {
-      this.timerInternals = new InMemoryTimerInternals();
+    InMemoryTimerInternalsFactory(final Map<K, StateAndTimerForKey> map) {
+      this.map = map;
     }
 
     @Override
     public TimerInternals timerInternalsForKey(final K key) {
-      return timerInternals;
+      map.putIfAbsent(key, new StateAndTimerForKey(null, new InMemoryTimerInternals()));
+      final StateAndTimerForKey stateAndTimerForKey = map.get(key);
+      if (stateAndTimerForKey.timerInternals == null) {
+        stateAndTimerForKey.timerInternals = new InMemoryTimerInternals();
+      }
+      return stateAndTimerForKey.timerInternals;
     }
   }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index 8189a48..b2fbaeb 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -194,41 +194,6 @@ public final class DoFnTransformTest {
     doFnTransform.close();
   }
 
-  private static final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
-    private final List<WindowedValue<T>> outputs;
-    private final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
-
-    TestOutputCollector() {
-      this.outputs = new LinkedList<>();
-      this.taggedOutputs = new LinkedList<>();
-    }
-
-    @Override
-    public void emit(WindowedValue<T> output) {
-      outputs.add(output);
-    }
-
-    @Override
-    public void emitWatermark(final Watermark watermark) {
-      // do nothing
-    }
-
-    @Override
-    public <O> void emit(String dstVertexId, O output) {
-      final WindowedValue<T> val = (WindowedValue<T>) output;
-      final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
-      taggedOutputs.add(tuple);
-    }
-
-    public List<WindowedValue<T>> getOutput() {
-      return outputs;
-    }
-
-    public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
-      return taggedOutputs;
-    }
-  }
-
   /**
    * Identitiy do fn.
    * @param <T> type
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
new file mode 100644
index 0000000..b7a960f
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.reef.io.Tuple;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class GroupByKeyAndWindowDoFnTransformTest {
+
+  private final static Coder NULL_INPUT_CODER = null;
+  private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
+
+  private void checkOutput(final KV<String, List<String>> expected, final KV<String, Iterable<String>> result) {
+
+    // check key
+    assertEquals(expected.getKey(), result.getKey());
+
+    // check value
+    final List<String> resultValue = new ArrayList<>();
+    final List<String> expectedValue = new ArrayList<>(expected.getValue());
+    result.getValue().iterator().forEachRemaining(resultValue::add);
+    Collections.sort(resultValue);
+
+    assertEquals(expectedValue, resultValue);
+  }
+
+
+  // [---- window1 --------]         [--------------- window2 ---------------]
+  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7
+  // (1, "hello")
+  //      (1, "world")
+  //             (2, "hello")
+  //                   ==> window1: {(1,["hello","world"]), (2, ["hello"])}
+  //                                 (1, "a")
+  //                                                       (2,"a")
+  //                                                             (3,"a")
+  //                                                                  (2,"b")
+  //                                                       => window2: {(1,"a"), (2,["a","b"]), (3,"a")}
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test() {
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+    final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
+      new GroupByKeyAndWindowDoFnTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(), /* additional outputs */
+        WindowingStrategy.of(fixedwindows),
+        emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.buffering(NULL_INPUT_CODER));
+
+    final Instant ts1 = new Instant(1);
+    final Instant ts2 = new Instant(100);
+    final Instant ts3 = new Instant(300);
+    final Watermark watermark = new Watermark(1003);
+    final Instant ts4 = new Instant(1200);
+    final Watermark watermark2 = new Watermark(1400);
+    final Instant ts5 = new Instant(1600);
+    final Instant ts6 = new Instant(1800);
+    final Instant ts7 = new Instant(1900);
+    final Watermark watermark3 = new Watermark(2100);
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING));
+
+    doFnTransform.onWatermark(watermark);
+
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
+
+    // windowed result for key 2
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), oc.outputs.get(1).getWindows());
+    checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+    // check output watermark
+    assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING));
+
+    // do not emit anything
+    doFnTransform.onWatermark(watermark2);
+    assertEquals(0, oc.outputs.size());
+    assertEquals(0, oc.watermarks.size());
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING));
+
+    // emit windowed value
+    doFnTransform.onWatermark(watermark3);
+
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue());
+
+    // windowed result for key 2
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts5)), oc.outputs.get(1).getWindows());
+    checkOutput(KV.of("2", Arrays.asList("a", "b")), oc.outputs.get(1).getValue());
+
+    // windowed result for key 3
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts6)), oc.outputs.get(2).getWindows());
+    checkOutput(KV.of("3", Arrays.asList("a")), oc.outputs.get(2).getValue());
+
+    // check output watermark
+    assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+      oc.watermarks.get(0).getTimestamp());
+
+    doFnTransform.close();
+  }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
new file mode 100644
index 0000000..9abff71
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.reef.io.Tuple;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Test output collector that collects data and watermarks.
+ * @param <T>
+ */
+final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
+  public final List<WindowedValue<T>> outputs;
+  public final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
+  public final List<Watermark> watermarks;
+
+  TestOutputCollector() {
+    this.outputs = new LinkedList<>();
+    this.taggedOutputs = new LinkedList<>();
+    this.watermarks = new LinkedList<>();
+  }
+
+  @Override
+  public void emit(WindowedValue<T> output) {
+      outputs.add(output);
+    }
+
+  @Override
+  public void emitWatermark(Watermark watermark) {
+    watermarks.add(watermark);
+  }
+
+
+  @Override
+  public <O> void emit(String dstVertexId, O output) {
+    final WindowedValue<T> val = (WindowedValue<T>) output;
+    final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
+    taggedOutputs.add(tuple);
+  }
+
+  public List<WindowedValue<T>> getOutput() {
+    return outputs;
+  }
+
+  public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
+      return taggedOutputs;
+    }
+}