You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/07 05:41:28 UTC

[incubator-nemo] branch master updated: [NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform (#151)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new b1dad52  [NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform (#151)
b1dad52 is described below

commit b1dad52ca965a6b67bb13d43f5a676eb2ad884d5
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Wed Nov 7 14:41:23 2018 +0900

    [NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform (#151)
    
    JIRA: [NEMO-267: Consider watermark holds in GroupByKeyAndWindowDoFnTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-267)
    
    **Major changes:**
    - Fix `GroupByKeyAndWindowDoFnTransform` to emit output watermarks by holding watermarks for each key
    
    **Tests for the changes:**
    - Fix `GroupByKeyAndWindowDoFnTransform` by using sliding windows
---
 .../apache/nemo/common/punctuation/Watermark.java  |  12 ++-
 .../beam/transform/AbstractDoFnTransform.java      |   9 +-
 .../frontend/beam/transform/DoFnTransform.java     |   6 ++
 .../GroupByKeyAndWindowDoFnTransform.java          | 103 ++++++++++++++----
 .../GroupByKeyAndWindowDoFnTransformTest.java      | 118 ++++++++++++++++-----
 5 files changed, 200 insertions(+), 48 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index 27ee53b..e676e6e 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -24,7 +24,7 @@ import java.util.Objects;
 /**
  * Watermark event.
  */
-public final class Watermark implements Serializable {
+public final class Watermark implements Serializable, Comparable<Watermark> {
 
   private final long timestamp;
   public Watermark(final long timestamp) {
@@ -48,7 +48,17 @@ public final class Watermark implements Serializable {
   }
 
   @Override
+  public String toString() {
+    return String.valueOf(timestamp);
+  }
+
+  @Override
   public int hashCode() {
     return Objects.hash(timestamp);
   }
+
+  @Override
+  public int compareTo(final Watermark o) {
+    return Long.compare(timestamp, o.getTimestamp());
+  }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 3585ea2..6a8f8d4 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -159,7 +159,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
     // deserialize pipeline option
     final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
-    this.outputCollector = oc;
+    this.outputCollector = wrapOutputCollector(oc);
 
     this.bundleMillis = options.getMaxBundleTimeMills();
     this.bundleSize = options.getMaxBundleSize();
@@ -227,6 +227,13 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
    */
   abstract DoFn wrapDoFn(final DoFn originalDoFn);
 
+  /**
+   * An abstract function that wraps the original output collector.
+   * @param oc the original outputCollector.
+   * @return wrapped output collector.
+   */
+  abstract OutputCollector wrapOutputCollector(final OutputCollector oc);
+
   @Override
   public abstract void onData(final WindowedValue<InputT> data);
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 433f9df..18368c6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.util.Collection;
@@ -84,6 +85,11 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
   }
 
   @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    return oc;
+  }
+
+  @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
     sb.append("DoTransform:" + getDoFn());
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 3727846..84b6835 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -48,7 +49,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
   private final Map<K, List<WindowedValue<InputT>>> keyToValues;
   private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
   private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
-  private long currentOutputWatermark;
+  private Watermark prevOutputWatermark;
+  private final Map<K, Watermark> keyAndWatermarkHoldMap;
 
   /**
    * GroupByKey constructor.
@@ -70,7 +72,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
       options);
     this.keyToValues = new HashMap<>();
     this.reduceFn = reduceFn;
-    this.currentOutputWatermark = Long.MIN_VALUE;
+    this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+    this.keyAndWatermarkHoldMap = new HashMap<>();
   }
 
   /**
@@ -96,6 +99,11 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
         getMainOutputTag());
   }
 
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    return new GBKWOutputCollector(oc);
+  }
+
   /**
    * It collects data for each key.
    * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
@@ -126,8 +134,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
   private void processElementsAndTriggerTimers(final Watermark inputWatermark,
                                                final Instant processingTime,
                                                final Instant synchronizedTime) {
-    long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE;
-
     for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
       final K key = entry.getKey();
       final List<WindowedValue<InputT>> values = entry.getValue();
@@ -143,20 +149,47 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
       }
 
       // Trigger timers
-      final long minOutputTimestamp =
-        triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
-
-      minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+      triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
 
       // Remove values
       values.clear();
     }
+  }
 
-    // Emit watermark to downstream operators
-    if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
-      && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
-      currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
-      getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows));
+  /**
+   * Output watermark
+   * = max(prev output watermark,
+   *          min(input watermark, watermark holds)).
+   * @param inputWatermark input watermark
+   */
+  private void emitOutputWatermark(final Watermark inputWatermark) {
+
+    if (keyAndWatermarkHoldMap.isEmpty()) {
+      return;
+    }
+
+    // Find min watermark hold
+    final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Watermark hold: {}, "
+        + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
+    }
+
+    final Watermark outputWatermarkCandidate = new Watermark(
+      Math.max(prevOutputWatermark.getTimestamp(),
+        Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
+
+    if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
+      // progress!
+      prevOutputWatermark = outputWatermarkCandidate;
+      // emit watermark
+      getOutputCollector().emitWatermark(outputWatermarkCandidate);
+      // Remove minimum watermark holds
+      if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
+        keyAndWatermarkHoldMap.entrySet()
+          .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp());
+      }
     }
   }
 
@@ -164,6 +197,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
   public void onWatermark(final Watermark inputWatermark) {
     checkAndInvokeBundle();
     processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
+    // Emit watermark to downstream operators
+    emitOutputWatermark(inputWatermark);
     checkAndFinishBundle();
   }
 
@@ -176,6 +211,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
     // Finish any pending windows by advancing the input watermark to infinity.
     processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
       BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+    // Emit watermark to downstream operators
+    emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
   }
 
   /**
@@ -185,10 +222,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
    * @param watermark watermark
    * @param processingTime processing time
    * @param synchronizedTime synchronized time
-   * @return the minimum output timestamp.
-   * If no data is emitted, it returns Long.MAX_VALUE.
    */
-  private long triggerTimers(final K key,
+  private void triggerTimers(final K key,
                              final Watermark watermark,
                              final Instant processingTime,
                              final Instant synchronizedTime) {
@@ -204,10 +239,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
 
     final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
 
-    if (timerDataList.isEmpty()) {
-      return Long.MAX_VALUE;
-    } else {
-
+    if (!timerDataList.isEmpty()) {
       // Trigger timers and emit windowed data
       final KeyedWorkItem<K, InputT> timerWorkItem =
         KeyedWorkItems.timersWorkItem(key, timerDataList);
@@ -223,8 +255,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
       }
 
       timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
-
-      return keyOutputTimestamp;
     }
   }
 
@@ -320,4 +350,33 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
       return stateAndTimerForKey.timerInternals;
     }
   }
+
+  /**
+   * This class wraps the output collector to track the watermark hold of each key.
+   */
+  final class GBKWOutputCollector implements OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> {
+    private final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector;
+    GBKWOutputCollector(final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector) {
+      this.outputCollector = outputCollector;
+    }
+
+    @Override
+    public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
+      // adds the output timestamp to the watermark hold of each key
+      // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
+      // TODO #270: consider early firing
+      // TODO #270: This logic may not be applied to early firing outputs
+      keyAndWatermarkHoldMap.put(output.getValue().getKey(),
+        new Watermark(output.getTimestamp().getMillis() + 1));
+      outputCollector.emit(output);
+    }
+    @Override
+    public void emitWatermark(final Watermark watermark) {
+      outputCollector.emitWatermark(watermark);
+    }
+    @Override
+    public <T> void emit(final String dstVertexId, final T output) {
+      outputCollector.emit(dstVertexId, output);
+    }
+  }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index b7a960f..e3fa23e 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -21,8 +21,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.*;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
@@ -42,6 +41,7 @@ import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
+// TODO #270: Test different triggers
 public final class GroupByKeyAndWindowDoFnTransformTest {
 
   private final static Coder NULL_INPUT_CODER = null;
@@ -57,13 +57,19 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
     final List<String> expectedValue = new ArrayList<>(expected.getValue());
     result.getValue().iterator().forEachRemaining(resultValue::add);
     Collections.sort(resultValue);
+    Collections.sort(expectedValue);
 
     assertEquals(expectedValue, resultValue);
   }
 
 
-  // [---- window1 --------]         [--------------- window2 ---------------]
-  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7
+  // window size: 2 sec
+  // interval size: 1 sec
+  //
+  //                           [--------------window2------------------------------]
+  // [----------------------- window1 --------------------------]
+  // [-------window0-------]
+  // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4
   // (1, "hello")
   //      (1, "world")
   //             (2, "hello")
@@ -78,13 +84,15 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
   public void test() {
 
     final TupleTag<String> outputTag = new TupleTag<>("main-output");
-    final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+    final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2))
+      .every(Duration.standardSeconds(1));
+
     final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
       new GroupByKeyAndWindowDoFnTransform(
         NULL_OUTPUT_CODERS,
         outputTag,
         Collections.emptyList(), /* additional outputs */
-        WindowingStrategy.of(fixedwindows),
+        WindowingStrategy.of(slidingWindows),
         emptyList(), /* side inputs */
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
         SystemReduceFn.buffering(NULL_INPUT_CODER));
@@ -99,6 +107,25 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
     final Instant ts6 = new Instant(1800);
     final Instant ts7 = new Instant(1900);
     final Watermark watermark3 = new Watermark(2100);
+    final Instant ts8 = new Instant(2200);
+    final Instant ts9 = new Instant(2300);
+    final Watermark watermark4 = new Watermark(3000);
+
+
+    List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+    // [0---1000)
+    final IntervalWindow window0 = sortedWindows.get(0);
+    // [0---2000)
+    final IntervalWindow window1 = sortedWindows.get(1);
+
+    sortedWindows.clear();
+    sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+    // [1000--3000)
+    final IntervalWindow window2 = sortedWindows.get(1);
 
 
     final Transform.Context context = mock(Transform.Context.class);
@@ -106,35 +133,41 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
     doFnTransform.prepare(context, oc);
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING));
+      KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING));
+      KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING));
+      KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
 
     doFnTransform.onWatermark(watermark);
 
+    // output
+    // 1: ["hello", "world"]
+    // 2: ["hello"]
     Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
 
     // windowed result for key 1
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows());
+    assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
     checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
 
     // windowed result for key 2
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), oc.outputs.get(1).getWindows());
+    assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
     checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
 
+    assertEquals(2, oc.outputs.size());
+    assertEquals(1, oc.watermarks.size());
+
     // check output watermark
-    assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+    assertEquals(1000,
       oc.watermarks.get(0).getTimestamp());
 
     oc.outputs.clear();
     oc.watermarks.clear();
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING));
+      KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
 
     // do not emit anything
     doFnTransform.onWatermark(watermark2);
@@ -142,33 +175,70 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
     assertEquals(0, oc.watermarks.size());
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING));
+      KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING));
+      KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING));
+      KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
 
-    // emit windowed value
+    // emit window1
     doFnTransform.onWatermark(watermark3);
 
+    // output
+    // 1: ["hello", "world", "a"]
+    // 2: ["hello"]
+    // 3: ["a", "a", "b"]
     Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
 
+
     // windowed result for key 1
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows());
-    checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue());
+    assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue());
 
     // windowed result for key 2
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts5)), oc.outputs.get(1).getWindows());
-    checkOutput(KV.of("2", Arrays.asList("a", "b")), oc.outputs.get(1).getValue());
+    assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows());
+    checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+    // windowed result for key 3
+    assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows());
+    checkOutput(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue());
+
+    // check output watermark
+    assertEquals(2000,
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
+
+    // emit window2
+    doFnTransform.onWatermark(watermark4);
+
+    // output
+    // 1: ["a", "a"]
+    // 3: ["a", "a", "b", "b"]
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    assertEquals(2, oc.outputs.size());
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue());
 
     // windowed result for key 3
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts6)), oc.outputs.get(2).getWindows());
-    checkOutput(KV.of("3", Arrays.asList("a")), oc.outputs.get(2).getValue());
+    assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows());
+    checkOutput(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue());
 
     // check output watermark
-    assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+    assertEquals(3000,
       oc.watermarks.get(0).getTimestamp());
 
     doFnTransform.close();