You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/24 09:52:55 UTC

[8/9] flink git commit: [FLINK-4994] Don't Clear Trigger State and Merging Window Set When Purging

[FLINK-4994] Don't Clear Trigger State and Merging Window Set When Purging

Before, when a Trigger returns TriggerResult.PURGE from any of the
on*() methods the WindowOperator will clear all state of that window
(window contents, merging window set) and call Trigger.clear() so that the
Trigger can clean up its state/timers.

This was problematic in some cases. For example, with merging windows (session
windows) this means that a late-arriving element will not be put into the
session that was previously built up but will be put into a completely new
session that only contains this one element.

The new behaviour is this:
 * Only clean window contents on PURGE
 * Register cleanup timer for any window, don't delete this on PURGE
 * When the cleanup timer fires: clean window state, clean merging window set,
call Trigger.clear() to allow it to clean state/timers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b331a42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b331a42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b331a42

Branch: refs/heads/master
Commit: 0b331a421267a541d91e94f2713534704ed32bed
Parents: bcca3fe
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Nov 2 11:51:07 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jan 24 10:42:34 2017 +0100

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       | 119 +++++++-------
 .../operators/windowing/WindowOperator.java     | 155 +++++++++++--------
 .../operators/windowing/WindowOperatorTest.java |  11 +-
 3 files changed, 158 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index d9c977a..45fea14 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -1,4 +1,4 @@
-/**
+/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
@@ -57,20 +57,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
  */
 @Internal
-public class EvictingWindowOperator<K, IN, OUT, W extends Window> 
+public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 		extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
 
 	private static final long serialVersionUID = 1L;
 
 	// ------------------------------------------------------------------------
-	// these fields are set by the API stream graph builder to configure the operator 
-	
+	// these fields are set by the API stream graph builder to configure the operator
+
 	private final Evictor<? super IN, ? super W> evictor;
 
 	private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor;
 
 	// ------------------------------------------------------------------------
-	// the fields below are instantiated once the operator runs in the runtime 
+	// the fields below are instantiated once the operator runs in the runtime
 
 	private transient EvictorContext evictorContext;
 
@@ -146,7 +146,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				if (stateWindow == null) {
 					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
 				}
-				
+
 				evictingWindowState.setCurrentNamespace(stateWindow);
 				evictingWindowState.add(element);
 
@@ -163,14 +163,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 						// if we have no state, there is nothing to do
 						continue;
 					}
-					fire(actualWindow, contents, evictingWindowState);
+					emitWindowContents(actualWindow, contents, evictingWindowState);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(actualWindow, evictingWindowState, mergingWindows);
-				} else {
-					registerCleanupTimer(actualWindow);
+					evictingWindowState.clear();
 				}
+				registerCleanupTimer(actualWindow);
 			}
 
 			mergingWindows.persist();
@@ -198,14 +197,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 						// if we have no state, there is nothing to do
 						continue;
 					}
-					fire(window, contents, evictingWindowState);
+					emitWindowContents(window, contents, evictingWindowState);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(window, evictingWindowState, null);
-				} else {
-					registerCleanupTimer(window);
+					evictingWindowState.clear();
 				}
+				registerCleanupTimer(window);
 			}
 		}
 	}
@@ -218,37 +216,42 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 		evictorContext.key = timer.getKey();
 		evictorContext.window = timer.getNamespace();
 
-		ListState<StreamRecord<IN>> windowState;
 		MergingWindowSet<W> mergingWindows = null;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(context.window);
 			if (stateWindow == null) {
-				// then the window is already purged and this is a cleanup
-				// timer set due to allowed lateness that has nothing to clean,
-				// so it is safe to just ignore
+				// Timer firing for non-existent window, this can only happen if a
+				// trigger did not clean up timers. We have already cleared the merging
+				// window and therefore the Trigger state, however, so nothing to do.
 				return;
+			} else {
+				evictingWindowState.setCurrentNamespace(stateWindow);
 			}
-			
-			evictingWindowState.setCurrentNamespace(stateWindow);
 		} else {
 			evictingWindowState.setCurrentNamespace(context.window);
 		}
 
 		Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
-		if (contents == null) {
-			// if we have no state, there is nothing to do
-			return;
+
+		if (contents != null) {
+			TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+			if (triggerResult.isFire()) {
+				emitWindowContents(context.window, contents, evictingWindowState);
+			}
+			if (triggerResult.isPurge()) {
+				evictingWindowState.clear();
+			}
 		}
 
-		TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
-		if (triggerResult.isFire()) {
-			fire(context.window, contents, evictingWindowState);
+		if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+			clearAllState(context.window, evictingWindowState, mergingWindows);
 		}
 
-		if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
-			cleanup(context.window, evictingWindowState, mergingWindows);
+		if (mergingWindows != null) {
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		}
 	}
 
@@ -259,40 +262,46 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 		evictorContext.key = timer.getKey();
 		evictorContext.window = timer.getNamespace();
 
-		ListState<StreamRecord<IN>> windowState;
 		MergingWindowSet<W> mergingWindows = null;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(context.window);
 			if (stateWindow == null) {
-				// then the window is already purged and this is a cleanup
-				// timer set due to allowed lateness that has nothing to clean,
-				// so it is safe to just ignore
+				// Timer firing for non-existent window, this can only happen if a
+				// trigger did not clean up timers. We have already cleared the merging
+				// window and therefore the Trigger state, however, so nothing to do.
 				return;
+			} else {
+				evictingWindowState.setCurrentNamespace(stateWindow);
 			}
-			evictingWindowState.setCurrentNamespace(stateWindow);
 		} else {
 			evictingWindowState.setCurrentNamespace(context.window);
 		}
 
 		Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
-		if (contents == null) {
-			// if we have no state, there is nothing to do
-			return;
+
+		if (contents != null) {
+			TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+			if (triggerResult.isFire()) {
+				emitWindowContents(context.window, contents, evictingWindowState);
+			}
+			if (triggerResult.isPurge()) {
+				evictingWindowState.clear();
+			}
 		}
 
-		TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
-		if (triggerResult.isFire()) {
-			fire(context.window, contents, evictingWindowState);
+		if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+			clearAllState(context.window, evictingWindowState, mergingWindows);
 		}
 
-		if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
-			cleanup(context.window, evictingWindowState, mergingWindows);
+		if (mergingWindows != null) {
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		}
 	}
 
-	private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
+	private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 
 		// Work around type system restrictions...
@@ -326,6 +335,18 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 		}
 	}
 
+	private void clearAllState(
+			W window,
+			ListState<StreamRecord<IN>> windowState,
+			MergingWindowSet<W> mergingWindows) throws Exception {
+
+		windowState.clear();
+		context.clear();
+		if (mergingWindows != null) {
+			mergingWindows.retireWindow(window);
+			mergingWindows.persist();
+		}
+	}
 
 	/**
 	 * {@code EvictorContext} is a utility for handling {@code Evictor} invocations. It can be reused
@@ -372,24 +393,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 		}
 	}
 
-	private void cleanup(W window,
-						ListState<StreamRecord<IN>> windowState,
-						MergingWindowSet<W> mergingWindows) throws Exception {
-
-		windowState.clear();
-		if (mergingWindows != null) {
-			mergingWindows.retireWindow(window);
-			mergingWindows.persist();
-		}
-		context.clear();
-	}
-
 	@Override
 	public void open() throws Exception {
 		super.open();
 
 		evictorContext = new EvictorContext(null,null);
-		evictingWindowState = (InternalListState<W, StreamRecord<IN>>) 
+		evictingWindowState = (InternalListState<W, StreamRecord<IN>>)
 				getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 0dbaffd..3c4f397 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -139,8 +139,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	/** The state in which the window contents is stored. Each window is a namespace */
 	private transient InternalAppendingState<W, IN, ACC> windowState;
 
-	/** The {@link #windowState}, typed to merging state for merging windows.
-	 * Null if the window state is not mergeable */
+	/**
+	 * The {@link #windowState}, typed to merging state for merging windows.
+	 * Null if the window state is not mergeable.
+	 */
 	private transient InternalMergingState<W, IN, ACC> windowMergingState;
 
 	/** The state that holds the merging window metadata (the sets that describe what is merged) */
@@ -292,7 +294,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					new ListStateDescriptor<>("merging-window-set", tupleSerializer);
 
 			// get the state that stores the merging sets
-			mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>) 
+			mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>)
 					getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
 			mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
 		}
@@ -320,7 +322,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		final Collection<W> elementWindows = windowAssigner.assignWindows(
 			element.getValue(), element.getTimestamp(), windowAssignerContext);
-		
+
 		final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
@@ -376,14 +378,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					if (contents == null) {
 						continue;
 					}
-					fire(actualWindow, contents);
+					emitWindowContents(actualWindow, contents);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(actualWindow, windowState, mergingWindows);
-				} else {
-					registerCleanupTimer(actualWindow);
+					windowState.clear();
 				}
+				registerCleanupTimer(actualWindow);
 			}
 
 			// need to make sure to update the merging state in state
@@ -409,14 +410,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					if (contents == null) {
 						continue;
 					}
-					fire(window, contents);
+					emitWindowContents(window, contents);
 				}
 
 				if (triggerResult.isPurge()) {
-					cleanup(window, windowState, null);
-				} else {
-					registerCleanupTimer(window);
+					windowState.clear();
 				}
+				registerCleanupTimer(window);
 			}
 		}
 	}
@@ -432,31 +432,40 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(context.window);
 			if (stateWindow == null) {
-				// then the window is already purged and this is a cleanup
-				// timer set due to allowed lateness that has nothing to clean,
-				// so it is safe to just ignore
+				// Timer firing for non-existent window, this can only happen if a
+				// trigger did not clean up timers. We have already cleared the merging
+				// window and therefore the Trigger state, however, so nothing to do.
 				return;
+			} else {
+				windowState.setCurrentNamespace(stateWindow);
 			}
-			
-			windowState.setCurrentNamespace(stateWindow);
 		} else {
 			windowState.setCurrentNamespace(context.window);
 			mergingWindows = null;
 		}
 
-		ACC contents = windowState.get();
-		if (contents == null) {
-			// if we have no state, there is nothing to do
-			return;
+		ACC contents = null;
+		if (windowState != null) {
+			contents = windowState.get();
 		}
 
-		TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
-		if (triggerResult.isFire()) {
-			fire(context.window, contents);
+		if (contents != null) {
+			TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+			if (triggerResult.isFire()) {
+				emitWindowContents(context.window, contents);
+			}
+			if (triggerResult.isPurge()) {
+				windowState.clear();
+			}
 		}
 
-		if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
-			cleanup(context.window, windowState, mergingWindows);
+		if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+			clearAllState(context.window, windowState, mergingWindows);
+		}
+
+		if (mergingWindows != null) {
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		}
 	}
 
@@ -471,55 +480,67 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			mergingWindows = getMergingWindowSet();
 			W stateWindow = mergingWindows.getStateWindow(context.window);
 			if (stateWindow == null) {
-				// then the window is already purged and this is a cleanup
-				// timer set due to allowed lateness that has nothing to clean,
-				// so it is safe to just ignore
+				// Timer firing for non-existent window, this can only happen if a
+				// trigger did not clean up timers. We have already cleared the merging
+				// window and therefore the Trigger state, however, so nothing to do.
 				return;
+			} else {
+				windowState.setCurrentNamespace(stateWindow);
 			}
-			windowState.setCurrentNamespace(stateWindow);
 		} else {
 			windowState.setCurrentNamespace(context.window);
 			mergingWindows = null;
 		}
 
-		ACC contents = windowState.get();
-		if (contents == null) {
-			// if we have no state, there is nothing to do
-			return;
+		ACC contents = null;
+		if (windowState != null) {
+			contents = windowState.get();
+		}
+
+		if (contents != null) {
+			TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+			if (triggerResult.isFire()) {
+				emitWindowContents(context.window, contents);
+			}
+			if (triggerResult.isPurge()) {
+				windowState.clear();
+			}
 		}
 
-		TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
-		if (triggerResult.isFire()) {
-			fire(context.window, contents);
+		if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+			clearAllState(context.window, windowState, mergingWindows);
 		}
 
-		if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
-			cleanup(context.window, windowState, mergingWindows);
+		if (mergingWindows != null) {
+			// need to make sure to update the merging state in state
+			mergingWindows.persist();
 		}
 	}
 
 	/**
-	 * Cleans up the window state if the provided {@link TriggerResult} requires so, or if it
-	 * is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that the
+	 * Drops all state for the given window and calls
+	 * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
+	 *
+	 * <p>The caller must ensure that the
 	 * correct key is set in the state backend and the context object.
 	 */
-	private void cleanup(W window,
-						AppendingState<IN, ACC> windowState,
-						MergingWindowSet<W> mergingWindows) throws Exception {
+	private void clearAllState(
+			W window,
+			AppendingState<IN, ACC> windowState,
+			MergingWindowSet<W> mergingWindows) throws Exception {
 		windowState.clear();
+		context.clear();
 		if (mergingWindows != null) {
 			mergingWindows.retireWindow(window);
 			mergingWindows.persist();
 		}
-		context.clear();
 	}
 
 	/**
-	 * Triggers the window computation if the provided {@link TriggerResult} requires so.
-	 * The caller must ensure that the correct key is set in the state backend and the context object.
+	 * Emits the contents of the given window using the {@link InternalWindowFunction}.
 	 */
 	@SuppressWarnings("unchecked")
-	private void fire(W window, ACC contents) throws Exception {
+	private void emitWindowContents(W window, ACC contents) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 		userFunction.apply(context.key, context.window, contents, timestampedCollector);
 	}
@@ -538,12 +559,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	/**
-	 * Decides if a window is currently late or not, based on the current
-	 * watermark, i.e. the current event time, and the allowed lateness.
-	 * @param window
-	 * 					The collection of windows returned by the {@link WindowAssigner}.
-	 * @return The windows (among the {@code eligibleWindows}) for which the element should still be
-	 * 					considered when triggering.
+	 * Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness
+	 * of the given window.
 	 */
 	protected boolean isLate(W window) {
 		return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
@@ -556,6 +573,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 */
 	protected void registerCleanupTimer(W window) {
 		long cleanupTime = cleanupTime(window);
+		if (cleanupTime == Long.MAX_VALUE) {
+			// don't set a GC timer for "end of time"
+			return;
+		}
+
 		if (windowAssigner.isEventTime()) {
 			context.registerEventTimeTimer(cleanupTime);
 		} else {
@@ -570,6 +592,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 */
 	protected void deleteCleanupTimer(W window) {
 		long cleanupTime = cleanupTime(window);
+		if (cleanupTime == Long.MAX_VALUE) {
+			// no need to clean up because we didn't set one
+			return;
+		}
 		if (windowAssigner.isEventTime()) {
 			context.deleteEventTimeTimer(cleanupTime);
 		} else {
@@ -587,24 +613,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * @param window the window whose cleanup time we are computing.
 	 */
 	private long cleanupTime(W window) {
-		long cleanupTime = window.maxTimestamp() + allowedLateness;
-		return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+		if (windowAssigner.isEventTime()) {
+			long cleanupTime = window.maxTimestamp() + allowedLateness;
+			return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+		} else {
+			return window.maxTimestamp();
+		}
 	}
 
 	/**
-	 * Decides if it is time to clean up the window state.
-	 * Clean up time for a window is:
-	 * 		<li> if it is event time, after the watermark passes the end of the window plus the user-specified allowed lateness
-	 * 		<li> if it is processing time, after the processing time at the node passes the end of the window.
-	 * 	@param window
-	 * 					the window to clean
-	 *  @param time
-	 *  				the current time (event or processing depending on the {@link WindowAssigner}
-	 *  @return {@code true} if it is time to clean up the window state, {@code false} otherwise.
+	 * Returns {@code true} if the given time is the cleanup time for the given window.
 	 */
 	protected final boolean isCleanupTime(W window, long time) {
-		long cleanupTime = cleanupTime(window);
-		return  cleanupTime == time;
+		return time == cleanupTime(window);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 2faa506..6238e6c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1601,20 +1601,21 @@ public class WindowOperatorTest extends TestLogger {
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
-		// dropped as late
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 14600L), 14599));
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 		testHarness.processWatermark(new Watermark(20000));
 
-		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
 		testHarness.processWatermark(new Watermark(100000));
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
 		testHarness.close();
 	}
 
@@ -1780,7 +1781,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
-		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 13000L), 12999));
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
@@ -1788,7 +1789,7 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 		testHarness.processWatermark(new Watermark(20000));
 
-		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
 		testHarness.processWatermark(new Watermark(100000));