You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/11/07 05:41:25 UTC

[GitHub] johnyangk closed pull request #151: [NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform

johnyangk closed pull request #151: [NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/151
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index 27ee53b3f..e676e6e8c 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -24,7 +24,7 @@
 /**
  * Watermark event.
  */
-public final class Watermark implements Serializable {
+public final class Watermark implements Serializable, Comparable<Watermark> {
 
   private final long timestamp;
   public Watermark(final long timestamp) {
@@ -47,8 +47,18 @@ public boolean equals(final Object o) {
     return timestamp == watermark.timestamp;
   }
 
+  @Override
+  public String toString() {
+    return String.valueOf(timestamp);
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(timestamp);
   }
+
+  @Override
+  public int compareTo(final Watermark o) {
+    return Long.compare(timestamp, o.getTimestamp());
+  }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 3585ea2fd..6a8f8d49a 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -159,7 +159,7 @@ protected final void checkAndFinishBundle() {
   public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
     // deserialize pipeline option
     final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
-    this.outputCollector = oc;
+    this.outputCollector = wrapOutputCollector(oc);
 
     this.bundleMillis = options.getMaxBundleTimeMills();
     this.bundleSize = options.getMaxBundleSize();
@@ -227,6 +227,13 @@ public final void close() {
    */
   abstract DoFn wrapDoFn(final DoFn originalDoFn);
 
+  /**
+   * An abstract function that wraps the original output collector.
+   * @param oc the original outputCollector.
+   * @return wrapped output collector.
+   */
+  abstract OutputCollector wrapOutputCollector(final OutputCollector oc);
+
   @Override
   public abstract void onData(final WindowedValue<InputT> data);
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 433f9df1f..18368c601 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -25,6 +25,7 @@
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.util.Collection;
@@ -83,6 +84,11 @@ protected void beforeClose() {
     // nothing
   }
 
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    return oc;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 372784629..84b6835b9 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -28,6 +28,7 @@
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -48,7 +49,8 @@
   private final Map<K, List<WindowedValue<InputT>>> keyToValues;
   private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
   private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
-  private long currentOutputWatermark;
+  private Watermark prevOutputWatermark;
+  private final Map<K, Watermark> keyAndWatermarkHoldMap;
 
   /**
    * GroupByKey constructor.
@@ -70,7 +72,8 @@ public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputC
       options);
     this.keyToValues = new HashMap<>();
     this.reduceFn = reduceFn;
-    this.currentOutputWatermark = Long.MIN_VALUE;
+    this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+    this.keyAndWatermarkHoldMap = new HashMap<>();
   }
 
   /**
@@ -96,6 +99,11 @@ protected DoFn wrapDoFn(final DoFn doFn) {
         getMainOutputTag());
   }
 
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    return new GBKWOutputCollector(oc);
+  }
+
   /**
    * It collects data for each key.
    * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
@@ -126,8 +134,6 @@ public void onData(final WindowedValue<KV<K, InputT>> element) {
   private void processElementsAndTriggerTimers(final Watermark inputWatermark,
                                                final Instant processingTime,
                                                final Instant synchronizedTime) {
-    long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE;
-
     for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
       final K key = entry.getKey();
       final List<WindowedValue<InputT>> values = entry.getValue();
@@ -143,20 +149,47 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark,
       }
 
       // Trigger timers
-      final long minOutputTimestamp =
-        triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
-
-      minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+      triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
 
       // Remove values
       values.clear();
     }
+  }
 
-    // Emit watermark to downstream operators
-    if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
-      && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
-      currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
-      getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows));
+  /**
+   * Output watermark
+   * = max(prev output watermark,
+   *          min(input watermark, watermark holds)).
+   * @param inputWatermark input watermark
+   */
+  private void emitOutputWatermark(final Watermark inputWatermark) {
+
+    if (keyAndWatermarkHoldMap.isEmpty()) {
+      return;
+    }
+
+    // Find min watermark hold
+    final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Watermark hold: {}, "
+        + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
+    }
+
+    final Watermark outputWatermarkCandidate = new Watermark(
+      Math.max(prevOutputWatermark.getTimestamp(),
+        Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
+
+    if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
+      // progress!
+      prevOutputWatermark = outputWatermarkCandidate;
+      // emit watermark
+      getOutputCollector().emitWatermark(outputWatermarkCandidate);
+      // Remove minimum watermark holds
+      if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
+        keyAndWatermarkHoldMap.entrySet()
+          .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp());
+      }
     }
   }
 
@@ -164,6 +197,8 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark,
   public void onWatermark(final Watermark inputWatermark) {
     checkAndInvokeBundle();
     processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
+    // Emit watermark to downstream operators
+    emitOutputWatermark(inputWatermark);
     checkAndFinishBundle();
   }
 
@@ -176,6 +211,8 @@ protected void beforeClose() {
     // Finish any pending windows by advancing the input watermark to infinity.
     processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
       BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+    // Emit watermark to downstream operators
+    emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
   }
 
   /**
@@ -185,10 +222,8 @@ protected void beforeClose() {
    * @param watermark watermark
    * @param processingTime processing time
    * @param synchronizedTime synchronized time
-   * @return the minimum output timestamp.
-   * If no data is emitted, it returns Long.MAX_VALUE.
    */
-  private long triggerTimers(final K key,
+  private void triggerTimers(final K key,
                              final Watermark watermark,
                              final Instant processingTime,
                              final Instant synchronizedTime) {
@@ -204,10 +239,7 @@ private long triggerTimers(final K key,
 
     final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
 
-    if (timerDataList.isEmpty()) {
-      return Long.MAX_VALUE;
-    } else {
-
+    if (!timerDataList.isEmpty()) {
       // Trigger timers and emit windowed data
       final KeyedWorkItem<K, InputT> timerWorkItem =
         KeyedWorkItems.timersWorkItem(key, timerDataList);
@@ -223,8 +255,6 @@ private long triggerTimers(final K key,
       }
 
       timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
-
-      return keyOutputTimestamp;
     }
   }
 
@@ -320,4 +350,33 @@ public TimerInternals timerInternalsForKey(final K key) {
       return stateAndTimerForKey.timerInternals;
     }
   }
+
+  /**
+   * This class wraps the output collector to track the watermark hold of each key.
+   */
+  final class GBKWOutputCollector implements OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> {
+    private final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector;
+    GBKWOutputCollector(final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector) {
+      this.outputCollector = outputCollector;
+    }
+
+    @Override
+    public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
+      // adds the output timestamp to the watermark hold of each key
+      // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
+      // TODO #270: consider early firing
+      // TODO #270: This logic may not be applied to early firing outputs
+      keyAndWatermarkHoldMap.put(output.getValue().getKey(),
+        new Watermark(output.getTimestamp().getMillis() + 1));
+      outputCollector.emit(output);
+    }
+    @Override
+    public void emitWatermark(final Watermark watermark) {
+      outputCollector.emitWatermark(watermark);
+    }
+    @Override
+    public <T> void emit(final String dstVertexId, final T output) {
+      outputCollector.emit(dstVertexId, output);
+    }
+  }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index b7a960f13..e3fa23eaa 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -21,8 +21,7 @@
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.*;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
@@ -42,6 +41,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
+// TODO #270: Test different triggers
 public final class GroupByKeyAndWindowDoFnTransformTest {
 
   private final static Coder NULL_INPUT_CODER = null;
@@ -57,13 +57,19 @@ private void checkOutput(final KV<String, List<String>> expected, final KV<Strin
     final List<String> expectedValue = new ArrayList<>(expected.getValue());
     result.getValue().iterator().forEachRemaining(resultValue::add);
     Collections.sort(resultValue);
+    Collections.sort(expectedValue);
 
     assertEquals(expectedValue, resultValue);
   }
 
 
-  // [---- window1 --------]         [--------------- window2 ---------------]
-  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7
+  // window size: 2 sec
+  // interval size: 1 sec
+  //
+  //                           [--------------window2------------------------------]
+  // [----------------------- window1 --------------------------]
+  // [-------window0-------]
+  // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4
   // (1, "hello")
   //      (1, "world")
   //             (2, "hello")
@@ -78,13 +84,15 @@ private void checkOutput(final KV<String, List<String>> expected, final KV<Strin
   public void test() {
 
     final TupleTag<String> outputTag = new TupleTag<>("main-output");
-    final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+    final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2))
+      .every(Duration.standardSeconds(1));
+
     final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
       new GroupByKeyAndWindowDoFnTransform(
         NULL_OUTPUT_CODERS,
         outputTag,
         Collections.emptyList(), /* additional outputs */
-        WindowingStrategy.of(fixedwindows),
+        WindowingStrategy.of(slidingWindows),
         emptyList(), /* side inputs */
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
         SystemReduceFn.buffering(NULL_INPUT_CODER));
@@ -99,6 +107,25 @@ public void test() {
     final Instant ts6 = new Instant(1800);
     final Instant ts7 = new Instant(1900);
     final Watermark watermark3 = new Watermark(2100);
+    final Instant ts8 = new Instant(2200);
+    final Instant ts9 = new Instant(2300);
+    final Watermark watermark4 = new Watermark(3000);
+
+
+    List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+    // [0---1000)
+    final IntervalWindow window0 = sortedWindows.get(0);
+    // [0---2000)
+    final IntervalWindow window1 = sortedWindows.get(1);
+
+    sortedWindows.clear();
+    sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+    // [1000--3000)
+    final IntervalWindow window2 = sortedWindows.get(1);
 
 
     final Transform.Context context = mock(Transform.Context.class);
@@ -106,35 +133,41 @@ public void test() {
     doFnTransform.prepare(context, oc);
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING));
+      KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING));
+      KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING));
+      KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
 
     doFnTransform.onWatermark(watermark);
 
+    // output
+    // 1: ["hello", "world"]
+    // 2: ["hello"]
     Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
 
     // windowed result for key 1
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows());
+    assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
     checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
 
     // windowed result for key 2
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), oc.outputs.get(1).getWindows());
+    assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
     checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
 
+    assertEquals(2, oc.outputs.size());
+    assertEquals(1, oc.watermarks.size());
+
     // check output watermark
-    assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+    assertEquals(1000,
       oc.watermarks.get(0).getTimestamp());
 
     oc.outputs.clear();
     oc.watermarks.clear();
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING));
+      KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
 
     // do not emit anything
     doFnTransform.onWatermark(watermark2);
@@ -142,33 +175,70 @@ public void test() {
     assertEquals(0, oc.watermarks.size());
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING));
+      KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING));
+      KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING));
+      KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
 
-    // emit windowed value
+    // emit window1
     doFnTransform.onWatermark(watermark3);
 
+    // output
+    // 1: ["hello", "world", "a"]
+    // 2: ["hello"]
+    // 3: ["a", "a", "b"]
     Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
 
+
     // windowed result for key 1
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows());
-    checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue());
+    assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue());
 
     // windowed result for key 2
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts5)), oc.outputs.get(1).getWindows());
-    checkOutput(KV.of("2", Arrays.asList("a", "b")), oc.outputs.get(1).getValue());
+    assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows());
+    checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+    // windowed result for key 3
+    assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows());
+    checkOutput(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue());
+
+    // check output watermark
+    assertEquals(2000,
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
+
+    // emit window2
+    doFnTransform.onWatermark(watermark4);
+
+    // output
+    // 1: ["a", "a"]
+    // 3: ["a", "a", "b", "b"]
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    assertEquals(2, oc.outputs.size());
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue());
 
     // windowed result for key 3
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts6)), oc.outputs.get(2).getWindows());
-    checkOutput(KV.of("3", Arrays.asList("a")), oc.outputs.get(2).getValue());
+    assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows());
+    checkOutput(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue());
 
     // check output watermark
-    assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+    assertEquals(3000,
       oc.watermarks.get(0).getTimestamp());
 
     doFnTransform.close();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services