You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/24 09:52:55 UTC
[8/9] flink git commit: [FLINK-4994] Don't Clear Trigger State and
Merging Window Set When Purging
[FLINK-4994] Don't Clear Trigger State and Merging Window Set When Purging
Before, when a Trigger returns TriggerResult.PURGE from any of the
on*() methods the WindowOperator will clear all state of that window
(window contents, merging window set) and call Trigger.clear() so that the
Trigger can clean up its state/timers.
This was problematic in some cases. For example, with merging windows (session
windows) this means that a late-arriving element will not be put into the
session that was previously built up but will be put into a completely new
session that only contains this one element.
The new behaviour is this:
* Only clean window contents on PURGE
* Register cleanup timer for any window, don't delete this on PURGE
* When the cleanup timer fires: clean window state, clean merging window set,
call Trigger.clear() to allow it to clean state/timers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b331a42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b331a42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b331a42
Branch: refs/heads/master
Commit: 0b331a421267a541d91e94f2713534704ed32bed
Parents: bcca3fe
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Nov 2 11:51:07 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jan 24 10:42:34 2017 +0100
----------------------------------------------------------------------
.../windowing/EvictingWindowOperator.java | 119 +++++++-------
.../operators/windowing/WindowOperator.java | 155 +++++++++++--------
.../operators/windowing/WindowOperatorTest.java | 11 +-
3 files changed, 158 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index d9c977a..45fea14 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -57,20 +57,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
*/
@Internal
-public class EvictingWindowOperator<K, IN, OUT, W extends Window>
+public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
private static final long serialVersionUID = 1L;
// ------------------------------------------------------------------------
- // these fields are set by the API stream graph builder to configure the operator
-
+ // these fields are set by the API stream graph builder to configure the operator
+
private final Evictor<? super IN, ? super W> evictor;
private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor;
// ------------------------------------------------------------------------
- // the fields below are instantiated once the operator runs in the runtime
+ // the fields below are instantiated once the operator runs in the runtime
private transient EvictorContext evictorContext;
@@ -146,7 +146,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
-
+
evictingWindowState.setCurrentNamespace(stateWindow);
evictingWindowState.add(element);
@@ -163,14 +163,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
// if we have no state, there is nothing to do
continue;
}
- fire(actualWindow, contents, evictingWindowState);
+ emitWindowContents(actualWindow, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
- cleanup(actualWindow, evictingWindowState, mergingWindows);
- } else {
- registerCleanupTimer(actualWindow);
+ evictingWindowState.clear();
}
+ registerCleanupTimer(actualWindow);
}
mergingWindows.persist();
@@ -198,14 +197,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
// if we have no state, there is nothing to do
continue;
}
- fire(window, contents, evictingWindowState);
+ emitWindowContents(window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
- cleanup(window, evictingWindowState, null);
- } else {
- registerCleanupTimer(window);
+ evictingWindowState.clear();
}
+ registerCleanupTimer(window);
}
}
}
@@ -218,37 +216,42 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
- ListState<StreamRecord<IN>> windowState;
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
+ // Timer firing for non-existent window, this can only happen if a
+ // trigger did not clean up timers. We have already cleared the merging
+ // window and therefore the Trigger state, however, so nothing to do.
return;
+ } else {
+ evictingWindowState.setCurrentNamespace(stateWindow);
}
-
- evictingWindowState.setCurrentNamespace(stateWindow);
} else {
evictingWindowState.setCurrentNamespace(context.window);
}
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- return;
+
+ if (contents != null) {
+ TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ emitWindowContents(context.window, contents, evictingWindowState);
+ }
+ if (triggerResult.isPurge()) {
+ evictingWindowState.clear();
+ }
}
- TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents, evictingWindowState);
+ if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+ clearAllState(context.window, evictingWindowState, mergingWindows);
}
- if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, evictingWindowState, mergingWindows);
+ if (mergingWindows != null) {
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
}
}
@@ -259,40 +262,46 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
- ListState<StreamRecord<IN>> windowState;
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
+ // Timer firing for non-existent window, this can only happen if a
+ // trigger did not clean up timers. We have already cleared the merging
+ // window and therefore the Trigger state, however, so nothing to do.
return;
+ } else {
+ evictingWindowState.setCurrentNamespace(stateWindow);
}
- evictingWindowState.setCurrentNamespace(stateWindow);
} else {
evictingWindowState.setCurrentNamespace(context.window);
}
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- return;
+
+ if (contents != null) {
+ TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ emitWindowContents(context.window, contents, evictingWindowState);
+ }
+ if (triggerResult.isPurge()) {
+ evictingWindowState.clear();
+ }
}
- TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents, evictingWindowState);
+ if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+ clearAllState(context.window, evictingWindowState, mergingWindows);
}
- if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, evictingWindowState, mergingWindows);
+ if (mergingWindows != null) {
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
}
}
- private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
+ private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
@@ -326,6 +335,18 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
}
}
+ private void clearAllState(
+ W window,
+ ListState<StreamRecord<IN>> windowState,
+ MergingWindowSet<W> mergingWindows) throws Exception {
+
+ windowState.clear();
+ context.clear();
+ if (mergingWindows != null) {
+ mergingWindows.retireWindow(window);
+ mergingWindows.persist();
+ }
+ }
/**
* {@code EvictorContext} is a utility for handling {@code Evictor} invocations. It can be reused
@@ -372,24 +393,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
}
}
- private void cleanup(W window,
- ListState<StreamRecord<IN>> windowState,
- MergingWindowSet<W> mergingWindows) throws Exception {
-
- windowState.clear();
- if (mergingWindows != null) {
- mergingWindows.retireWindow(window);
- mergingWindows.persist();
- }
- context.clear();
- }
-
@Override
public void open() throws Exception {
super.open();
evictorContext = new EvictorContext(null,null);
- evictingWindowState = (InternalListState<W, StreamRecord<IN>>)
+ evictingWindowState = (InternalListState<W, StreamRecord<IN>>)
getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 0dbaffd..3c4f397 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -139,8 +139,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
/** The state in which the window contents is stored. Each window is a namespace */
private transient InternalAppendingState<W, IN, ACC> windowState;
- /** The {@link #windowState}, typed to merging state for merging windows.
- * Null if the window state is not mergeable */
+ /**
+ * The {@link #windowState}, typed to merging state for merging windows.
+ * Null if the window state is not mergeable.
+ */
private transient InternalMergingState<W, IN, ACC> windowMergingState;
/** The state that holds the merging window metadata (the sets that describe what is merged) */
@@ -292,7 +294,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
new ListStateDescriptor<>("merging-window-set", tupleSerializer);
// get the state that stores the merging sets
- mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>)
+ mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>)
getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
}
@@ -320,7 +322,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
-
+
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
@@ -376,14 +378,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (contents == null) {
continue;
}
- fire(actualWindow, contents);
+ emitWindowContents(actualWindow, contents);
}
if (triggerResult.isPurge()) {
- cleanup(actualWindow, windowState, mergingWindows);
- } else {
- registerCleanupTimer(actualWindow);
+ windowState.clear();
}
+ registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
@@ -409,14 +410,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (contents == null) {
continue;
}
- fire(window, contents);
+ emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
- cleanup(window, windowState, null);
- } else {
- registerCleanupTimer(window);
+ windowState.clear();
}
+ registerCleanupTimer(window);
}
}
}
@@ -432,31 +432,40 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
+ // Timer firing for non-existent window, this can only happen if a
+ // trigger did not clean up timers. We have already cleared the merging
+ // window and therefore the Trigger state, however, so nothing to do.
return;
+ } else {
+ windowState.setCurrentNamespace(stateWindow);
}
-
- windowState.setCurrentNamespace(stateWindow);
} else {
windowState.setCurrentNamespace(context.window);
mergingWindows = null;
}
- ACC contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- return;
+ ACC contents = null;
+ if (windowState != null) {
+ contents = windowState.get();
}
- TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents);
+ if (contents != null) {
+ TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ emitWindowContents(context.window, contents);
+ }
+ if (triggerResult.isPurge()) {
+ windowState.clear();
+ }
}
- if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, windowState, mergingWindows);
+ if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+ clearAllState(context.window, windowState, mergingWindows);
+ }
+
+ if (mergingWindows != null) {
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
}
}
@@ -471,55 +480,67 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
+ // Timer firing for non-existent window, this can only happen if a
+ // trigger did not clean up timers. We have already cleared the merging
+ // window and therefore the Trigger state, however, so nothing to do.
return;
+ } else {
+ windowState.setCurrentNamespace(stateWindow);
}
- windowState.setCurrentNamespace(stateWindow);
} else {
windowState.setCurrentNamespace(context.window);
mergingWindows = null;
}
- ACC contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- return;
+ ACC contents = null;
+ if (windowState != null) {
+ contents = windowState.get();
+ }
+
+ if (contents != null) {
+ TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ emitWindowContents(context.window, contents);
+ }
+ if (triggerResult.isPurge()) {
+ windowState.clear();
+ }
}
- TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents);
+ if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+ clearAllState(context.window, windowState, mergingWindows);
}
- if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, windowState, mergingWindows);
+ if (mergingWindows != null) {
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
}
}
/**
- * Cleans up the window state if the provided {@link TriggerResult} requires so, or if it
- * is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that the
+ * Drops all state for the given window and calls
+ * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
+ *
+ * <p>The caller must ensure that the
* correct key is set in the state backend and the context object.
*/
- private void cleanup(W window,
- AppendingState<IN, ACC> windowState,
- MergingWindowSet<W> mergingWindows) throws Exception {
+ private void clearAllState(
+ W window,
+ AppendingState<IN, ACC> windowState,
+ MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
+ context.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
}
- context.clear();
}
/**
- * Triggers the window computation if the provided {@link TriggerResult} requires so.
- * The caller must ensure that the correct key is set in the state backend and the context object.
+ * Emits the contents of the given window using the {@link InternalWindowFunction}.
*/
@SuppressWarnings("unchecked")
- private void fire(W window, ACC contents) throws Exception {
+ private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
userFunction.apply(context.key, context.window, contents, timestampedCollector);
}
@@ -538,12 +559,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
/**
- * Decides if a window is currently late or not, based on the current
- * watermark, i.e. the current event time, and the allowed lateness.
- * @param window
- * The collection of windows returned by the {@link WindowAssigner}.
- * @return The windows (among the {@code eligibleWindows}) for which the element should still be
- * considered when triggering.
+ * Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness
+ * of the given window.
*/
protected boolean isLate(W window) {
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
@@ -556,6 +573,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
+ if (cleanupTime == Long.MAX_VALUE) {
+ // don't set a GC timer for "end of time"
+ return;
+ }
+
if (windowAssigner.isEventTime()) {
context.registerEventTimeTimer(cleanupTime);
} else {
@@ -570,6 +592,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
protected void deleteCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
+ if (cleanupTime == Long.MAX_VALUE) {
+ // no need to clean up because we didn't set one
+ return;
+ }
if (windowAssigner.isEventTime()) {
context.deleteEventTimeTimer(cleanupTime);
} else {
@@ -587,24 +613,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* @param window the window whose cleanup time we are computing.
*/
private long cleanupTime(W window) {
- long cleanupTime = window.maxTimestamp() + allowedLateness;
- return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+ if (windowAssigner.isEventTime()) {
+ long cleanupTime = window.maxTimestamp() + allowedLateness;
+ return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+ } else {
+ return window.maxTimestamp();
+ }
}
/**
- * Decides if it is time to clean up the window state.
- * Clean up time for a window is:
- * <li> if it is event time, after the watermark passes the end of the window plus the user-specified allowed lateness
- * <li> if it is processing time, after the processing time at the node passes the end of the window.
- * @param window
- * the window to clean
- * @param time
- * the current time (event or processing depending on the {@link WindowAssigner}
- * @return {@code true} if it is time to clean up the window state, {@code false} otherwise.
+ * Returns {@code true} if the given time is the cleanup time for the given window.
*/
protected final boolean isCleanupTime(W window, long time) {
- long cleanupTime = cleanupTime(window);
- return cleanupTime == time;
+ return time == cleanupTime(window);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 2faa506..6238e6c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1601,20 +1601,21 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
- // dropped as late
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 14600L), 14599));
+
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
- expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 17500L), 17499));
expected.add(new Watermark(20000));
testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
testHarness.close();
}
@@ -1780,7 +1781,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
- expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 13000L), 12999));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
@@ -1788,7 +1789,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
- expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 17500L), 17499));
expected.add(new Watermark(20000));
testHarness.processWatermark(new Watermark(100000));