You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/10/31 03:22:25 UTC

[GitHub] johnyangk closed pull request #135: [NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform

johnyangk closed pull request #135: [NEMO-230] Emit collected data when receiving watermark in GroupByKeyAndWindowTransform
URL: https://github.com/apache/incubator-nemo/pull/135
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 57475bad6..7c44b7610 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -29,6 +29,7 @@
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +46,8 @@
 
   private final SystemReduceFn reduceFn;
   private final Map<K, List<WindowedValue<InputT>>> keyToValues;
-  private transient InMemoryTimerInternalsFactory timerInternalsFactory;
+  private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
+  private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
 
   /**
    * GroupByKey constructor.
@@ -76,30 +78,69 @@ public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputC
    */
   @Override
   protected DoFn wrapDoFn(final DoFn doFn) {
-    timerInternalsFactory = new InMemoryTimerInternalsFactory();
+    final Map<K, StateAndTimerForKey> map = new HashMap<>();
+    this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory(map);
+    this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory(map);
+
     // This function performs group by key and window operation
     return
       GroupAlsoByWindowViaWindowSetNewDoFn.create(
         getWindowingStrategy(),
-        new InMemoryStateInternalsFactory(),
-        timerInternalsFactory,
+        inMemoryStateInternalsFactory,
+        inMemoryTimerInternalsFactory,
         getSideInputReader(),
         reduceFn,
         getOutputManager(),
         getMainOutputTag());
   }
 
+  /**
+   * It collects data for each key.
+   * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
+   * @param element data element
+   */
   @Override
   public void onData(final WindowedValue<KV<K, InputT>> element) {
+    // We can call Beam's DoFnRunner#processElement here,
+    // but it may generate some overheads if we call the method for each data.
+    // The `processElement` requires a `Iterator` of data, so we emit the buffered data every watermark.
+    // TODO #250: But, this approach can delay the event processing in streaming,
+    // TODO #250: if the watermark is not triggered for a long time.
     final KV<K, InputT> kv = element.getValue();
-    keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
+    keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
     keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
   }
 
+  /**
+   * Process the collected data and trigger timers.
+   * @param watermark current watermark
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   */
+  private void processElementsAndTriggerTimers(final Watermark watermark,
+                                               final Instant processingTime,
+                                               final Instant synchronizedTime) {
+    keyToValues.forEach((key, val) -> {
+      // for each key
+      // Process elements
+      if (!val.isEmpty()) {
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.elementsWorkItem(key, val);
+        // The DoFnRunner interface requires WindowedValue,
+        // but this windowed value is actually not used in the ReduceFnRunner internal.
+        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+      }
+
+      // Trigger timers
+      triggerTimers(key, watermark, processingTime, synchronizedTime);
+      // Remove values
+      val.clear();
+    });
+  }
+
   @Override
   public void onWatermark(final Watermark watermark) {
-    // TODO #230: Emit collected data when receiving watermark
-    // TODO #230: in GroupByKeyAndWindowTransform
+    processElementsAndTriggerTimers(watermark, Instant.now(), Instant.now());
   }
 
   /**
@@ -108,33 +149,58 @@ public void onWatermark(final Watermark watermark) {
    */
   @Override
   protected void beforeClose() {
-    final InMemoryTimerInternals timerInternals = timerInternalsFactory.timerInternals;
-    try {
-      // Finish any pending windows by advancing the input watermark to infinity.
-      timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    // Finish any pending windows by advancing the input watermark to infinity.
+    processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
+      BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
 
-      // Finally, advance the processing time to infinity to fire any timers.
-      timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-      timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  /**
+   * Trigger times for current key.
+   * When triggering, it emits the windowed data to downstream operators.
+   * @param key key
+   * @param watermark watermark
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   */
+  private void triggerTimers(final K key,
+                             final Watermark watermark,
+                             final Instant processingTime,
+                             final Instant synchronizedTime) {
+    final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
+      inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+    try {
+      timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp()));
+      timerInternals.advanceProcessingTime(processingTime);
+      timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
 
-    if (keyToValues.isEmpty()) {
-      LOG.warn("Beam GroupByKeyAndWindowDoFnTransform received no data!");
-    } else {
-      // timer
-      final Iterable<TimerInternals.TimerData> timerData = getTimers(timerInternals);
+    final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
 
-      keyToValues.entrySet().stream().forEach(entry -> {
-        // The GroupAlsoByWindowViaWindowSetNewDoFn requires KeyedWorkItem,
-        // so we convert the KV to KeyedWorkItem
-        final KeyedWorkItem<K, InputT> keyedWorkItem =
-          KeyedWorkItems.workItem(entry.getKey(), timerData, entry.getValue());
-        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
-      });
-      keyToValues.clear();
+    if (timerDataList.isEmpty()) {
+      return;
+    }
+
+    // Trigger timers and emit windowed data
+    final KeyedWorkItem<K, InputT> timerWorkItem =
+      KeyedWorkItems.timersWorkItem(key, timerDataList);
+    // The DoFnRunner interface requires WindowedValue,
+    // but this windowed value is actually not used in the ReduceFnRunner internal.
+    getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
+
+    // output watermark
+    // we set output watermark to the minimum of the timer data
+    long outputWatermark = Long.MAX_VALUE;
+    for (final TimerInternals.TimerData timer : timerDataList) {
+      if (outputWatermark > timer.getTimestamp().getMillis()) {
+        outputWatermark = timer.getTimestamp().getMillis();
+      }
     }
+
+    // Emit watermark to downstream operators
+    timerInternals.advanceOutputWatermark(new Instant(outputWatermark));
+    getOutputCollector().emitWatermark(new Watermark(outputWatermark));
   }
 
   @Override
@@ -144,7 +210,10 @@ public String toString() {
     return sb.toString();
   }
 
-  private Iterable<TimerInternals.TimerData> getTimers(final InMemoryTimerInternals timerInternals) {
+  /**
+   * Get timer data.
+   */
+  private List<TimerInternals.TimerData> getEligibleTimers(final InMemoryTimerInternals timerInternals) {
     final List<TimerInternals.TimerData> timerData = new LinkedList<>();
 
     while (true) {
@@ -171,19 +240,38 @@ public String toString() {
     return timerData;
   }
 
+  /**
+   * State and timer internal.
+   */
+  final class StateAndTimerForKey {
+    private StateInternals stateInternals;
+    private TimerInternals timerInternals;
+
+    StateAndTimerForKey(final StateInternals stateInternals,
+                        final TimerInternals timerInternals) {
+      this.stateInternals = stateInternals;
+      this.timerInternals = timerInternals;
+    }
+  }
+
   /**
    * InMemoryStateInternalsFactory.
    */
   final class InMemoryStateInternalsFactory implements StateInternalsFactory<K> {
-    private final InMemoryStateInternals inMemoryStateInternals;
+    private final Map<K, StateAndTimerForKey> map;
 
-    InMemoryStateInternalsFactory() {
-      this.inMemoryStateInternals = InMemoryStateInternals.forKey(null);
+    InMemoryStateInternalsFactory(final Map<K, StateAndTimerForKey> map) {
+      this.map = map;
     }
 
     @Override
     public StateInternals stateInternalsForKey(final K key) {
-      return inMemoryStateInternals;
+      map.putIfAbsent(key, new StateAndTimerForKey(InMemoryStateInternals.forKey(key), null));
+      final StateAndTimerForKey stateAndTimerForKey = map.get(key);
+      if (stateAndTimerForKey.stateInternals == null) {
+        stateAndTimerForKey.stateInternals = InMemoryStateInternals.forKey(key);
+      }
+      return stateAndTimerForKey.stateInternals;
     }
   }
 
@@ -191,15 +279,20 @@ public StateInternals stateInternalsForKey(final K key) {
    * InMemoryTimerInternalsFactory.
    */
   final class InMemoryTimerInternalsFactory implements TimerInternalsFactory<K> {
-    private final InMemoryTimerInternals timerInternals;
+    private final Map<K, StateAndTimerForKey> map;
 
-    InMemoryTimerInternalsFactory() {
-      this.timerInternals = new InMemoryTimerInternals();
+    InMemoryTimerInternalsFactory(final Map<K, StateAndTimerForKey> map) {
+      this.map = map;
     }
 
     @Override
     public TimerInternals timerInternalsForKey(final K key) {
-      return timerInternals;
+      map.putIfAbsent(key, new StateAndTimerForKey(null, new InMemoryTimerInternals()));
+      final StateAndTimerForKey stateAndTimerForKey = map.get(key);
+      if (stateAndTimerForKey.timerInternals == null) {
+        stateAndTimerForKey.timerInternals = new InMemoryTimerInternals();
+      }
+      return stateAndTimerForKey.timerInternals;
     }
   }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index 8189a48b4..b2fbaeb15 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -194,41 +194,6 @@ public void testSideInputs() {
     doFnTransform.close();
   }
 
-  private static final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
-    private final List<WindowedValue<T>> outputs;
-    private final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
-
-    TestOutputCollector() {
-      this.outputs = new LinkedList<>();
-      this.taggedOutputs = new LinkedList<>();
-    }
-
-    @Override
-    public void emit(WindowedValue<T> output) {
-      outputs.add(output);
-    }
-
-    @Override
-    public void emitWatermark(final Watermark watermark) {
-      // do nothing
-    }
-
-    @Override
-    public <O> void emit(String dstVertexId, O output) {
-      final WindowedValue<T> val = (WindowedValue<T>) output;
-      final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
-      taggedOutputs.add(tuple);
-    }
-
-    public List<WindowedValue<T>> getOutput() {
-      return outputs;
-    }
-
-    public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
-      return taggedOutputs;
-    }
-  }
-
   /**
    * Identitiy do fn.
    * @param <T> type
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
new file mode 100644
index 000000000..b7a960f13
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.reef.io.Tuple;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class GroupByKeyAndWindowDoFnTransformTest {
+
+  private final static Coder NULL_INPUT_CODER = null;
+  private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
+
+  private void checkOutput(final KV<String, List<String>> expected, final KV<String, Iterable<String>> result) {
+
+    // check key
+    assertEquals(expected.getKey(), result.getKey());
+
+    // check value
+    final List<String> resultValue = new ArrayList<>();
+    final List<String> expectedValue = new ArrayList<>(expected.getValue());
+    result.getValue().iterator().forEachRemaining(resultValue::add);
+    Collections.sort(resultValue);
+
+    assertEquals(expectedValue, resultValue);
+  }
+
+
+  // [---- window1 --------]         [--------------- window2 ---------------]
+  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7
+  // (1, "hello")
+  //      (1, "world")
+  //             (2, "hello")
+  //                   ==> window1: {(1,["hello","world"]), (2, ["hello"])}
+  //                                 (1, "a")
+  //                                                       (2,"a")
+  //                                                             (3,"a")
+  //                                                                  (2,"b")
+  //                                                       => window2: {(1,"a"), (2,["a","b"]), (3,"a")}
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test() {
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+    final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
+      new GroupByKeyAndWindowDoFnTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(), /* additional outputs */
+        WindowingStrategy.of(fixedwindows),
+        emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.buffering(NULL_INPUT_CODER));
+
+    final Instant ts1 = new Instant(1);
+    final Instant ts2 = new Instant(100);
+    final Instant ts3 = new Instant(300);
+    final Watermark watermark = new Watermark(1003);
+    final Instant ts4 = new Instant(1200);
+    final Watermark watermark2 = new Watermark(1400);
+    final Instant ts5 = new Instant(1600);
+    final Instant ts6 = new Instant(1800);
+    final Instant ts7 = new Instant(1900);
+    final Watermark watermark3 = new Watermark(2100);
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING));
+
+    doFnTransform.onWatermark(watermark);
+
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
+
+    // windowed result for key 2
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), oc.outputs.get(1).getWindows());
+    checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+    // check output watermark
+    assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING));
+
+    // do not emit anything
+    doFnTransform.onWatermark(watermark2);
+    assertEquals(0, oc.outputs.size());
+    assertEquals(0, oc.watermarks.size());
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING));
+
+    // emit windowed value
+    doFnTransform.onWatermark(watermark3);
+
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue());
+
+    // windowed result for key 2
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts5)), oc.outputs.get(1).getWindows());
+    checkOutput(KV.of("2", Arrays.asList("a", "b")), oc.outputs.get(1).getValue());
+
+    // windowed result for key 3
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts6)), oc.outputs.get(2).getWindows());
+    checkOutput(KV.of("3", Arrays.asList("a")), oc.outputs.get(2).getValue());
+
+    // check output watermark
+    assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+      oc.watermarks.get(0).getTimestamp());
+
+    doFnTransform.close();
+  }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
new file mode 100644
index 000000000..9abff7189
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.reef.io.Tuple;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Test output collector that collects data and watermarks.
+ * @param <T>
+ */
+final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
+  public final List<WindowedValue<T>> outputs;
+  public final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
+  public final List<Watermark> watermarks;
+
+  TestOutputCollector() {
+    this.outputs = new LinkedList<>();
+    this.taggedOutputs = new LinkedList<>();
+    this.watermarks = new LinkedList<>();
+  }
+
+  @Override
+  public void emit(WindowedValue<T> output) {
+      outputs.add(output);
+    }
+
+  @Override
+  public void emitWatermark(Watermark watermark) {
+    watermarks.add(watermark);
+  }
+
+
+  @Override
+  public <O> void emit(String dstVertexId, O output) {
+    final WindowedValue<T> val = (WindowedValue<T>) output;
+    final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
+    taggedOutputs.add(tuple);
+  }
+
+  public List<WindowedValue<T>> getOutput() {
+    return outputs;
+  }
+
+  public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
+      return taggedOutputs;
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services