You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/23 14:01:49 UTC
[09/10] flink git commit: [FLINK-4994] Don't Clear Trigger State and
Merging Window Set When Purging
[FLINK-4994] Don't Clear Trigger State and Merging Window Set When Purging
Before, when a Trigger returns TriggerResult.PURGE from any of the
on*() methods the WindowOperator will clear all state of that window
(window contents, merging window set) and call Trigger.clear() so that the
Trigger can clean up its state/timers.
This was problematic in some cases. For example, with merging windows (session
windows) this means that a late-arriving element will not be put into the
session that was previously built up but will be put into a completely new
session that only contains this one element.
The new behaviour is this:
* Only clean window contents on PURGE
* Register cleanup timer for any window, don't delete this on PURGE
* When the cleanup timer fires: clean window state, clean merging window set,
call Trigger.clear() to allow it to clean state/timers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9189c20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9189c20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9189c20
Branch: refs/heads/release-1.2
Commit: a9189c2055fd3c10d741f63208aadca7fb4218f5
Parents: 704b411
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Nov 2 11:51:07 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 23 14:53:22 2017 +0100
----------------------------------------------------------------------
.../windowing/EvictingWindowOperator.java | 118 ++++++++------
.../operators/windowing/WindowOperator.java | 155 +++++++++++--------
.../operators/windowing/WindowOperatorTest.java | 11 +-
3 files changed, 162 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a9189c20/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 8c73878..17b3984 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -154,14 +154,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// if we have no state, there is nothing to do
continue;
}
- fire(actualWindow, contents, windowState);
+ emitWindowContents(actualWindow, contents, windowState);
}
if (triggerResult.isPurge()) {
- cleanup(actualWindow, windowState, mergingWindows);
- } else {
- registerCleanupTimer(actualWindow);
+ windowState.clear();
}
+ registerCleanupTimer(actualWindow);
}
mergingWindows.persist();
@@ -190,14 +189,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// if we have no state, there is nothing to do
continue;
}
- fire(window, contents, windowState);
+ emitWindowContents(window, contents, windowState);
}
if (triggerResult.isPurge()) {
- cleanup(window, windowState, null);
- } else {
- registerCleanupTimer(window);
+ windowState.clear();
}
+ registerCleanupTimer(window);
}
}
}
@@ -217,12 +215,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
- return;
+ // timer firing for non-existent window, still have to run the cleanup logic
+ windowState = null;
+ } else {
+ windowState = getPartitionedState(
+ stateWindow,
+ windowSerializer,
+ windowStateDescriptor);
}
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(
context.window,
@@ -230,19 +230,28 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
windowStateDescriptor);
}
- Iterable<StreamRecord<IN>> contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- return;
+ Iterable<StreamRecord<IN>> contents = null;
+ if (windowState != null) {
+ contents = windowState.get();
}
- TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents, windowState);
+ if (contents != null) {
+ TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ emitWindowContents(context.window, contents, windowState);
+ }
+ if (triggerResult.isPurge()) {
+ windowState.clear();
+ }
+ }
+
+ if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+ clearAllState(context.window, windowState, mergingWindows);
}
- if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, windowState, mergingWindows);
+ if (mergingWindows != null) {
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
}
}
@@ -260,33 +269,44 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
- return;
+ // timer firing for non-existent window, still have to run the cleanup logic
+ windowState = null;
+ } else {
+ windowState = getPartitionedState(
+ stateWindow,
+ windowSerializer,
+ windowStateDescriptor);
}
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
- Iterable<StreamRecord<IN>> contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- return;
+ Iterable<StreamRecord<IN>> contents = null;
+ if (windowState != null) {
+ contents = windowState.get();
}
- TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents, windowState);
+ if (contents != null) {
+ TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ emitWindowContents(context.window, contents, windowState);
+ }
+ if (triggerResult.isPurge()) {
+ windowState.clear();
+ }
}
- if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, windowState, mergingWindows);
+ if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+ clearAllState(context.window, windowState, mergingWindows);
+ }
+
+ if (mergingWindows != null) {
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
}
}
- private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
+ private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
@@ -320,6 +340,18 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
}
}
+ private void clearAllState(
+ W window,
+ ListState<StreamRecord<IN>> windowState,
+ MergingWindowSet<W> mergingWindows) throws Exception {
+
+ windowState.clear();
+ context.clear();
+ if (mergingWindows != null) {
+ mergingWindows.retireWindow(window);
+ mergingWindows.persist();
+ }
+ }
/**
* {@code EvictorContext} is a utility for handling {@code Evictor} invocations. It can be reused
@@ -366,18 +398,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
}
}
- private void cleanup(W window,
- ListState<StreamRecord<IN>> windowState,
- MergingWindowSet<W> mergingWindows) throws Exception {
-
- windowState.clear();
- if (mergingWindows != null) {
- mergingWindows.retireWindow(window);
- mergingWindows.persist();
- }
- context.clear();
- }
-
@Override
public void open() throws Exception {
super.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9189c20/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 5ed5a4e..3144b6d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -348,14 +348,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (contents == null) {
continue;
}
- fire(actualWindow, contents);
+ emitWindowContents(actualWindow, contents);
}
if (triggerResult.isPurge()) {
- cleanup(actualWindow, windowState, mergingWindows);
- } else {
- registerCleanupTimer(actualWindow);
+ windowState.clear();
}
+ registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
@@ -382,14 +381,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (contents == null) {
continue;
}
- fire(window, contents);
+ emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
- cleanup(window, windowState, null);
- } else {
- registerCleanupTimer(window);
+ windowState.clear();
}
+ registerCleanupTimer(window);
}
}
}
@@ -406,32 +404,40 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
- return;
+ // timer firing for non-existent window, ignore
+ windowState = null;
+ } else {
+ windowState = getPartitionedState(
+ stateWindow,
+ windowSerializer,
+ windowStateDescriptor);
}
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
- windowState = getPartitionedState(
- context.window,
- windowSerializer,
- windowStateDescriptor);
+ windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
- ACC contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- return;
+ ACC contents = null;
+ if (windowState != null) {
+ contents = windowState.get();
}
- TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents);
+ if (contents != null) {
+ TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ emitWindowContents(context.window, contents);
+ }
+ if (triggerResult.isPurge()) {
+ windowState.clear();
+ }
}
- if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, windowState, mergingWindows);
+ if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+ clearAllState(context.window, windowState, mergingWindows);
+ }
+
+ if (mergingWindows != null) {
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
}
}
@@ -447,54 +453,67 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
- return;
+ // timer firing for non-existent window, ignore
+ windowState = null;
+ } else {
+ windowState = getPartitionedState(
+ stateWindow,
+ windowSerializer,
+ windowStateDescriptor);
}
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
- ACC contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- return;
+ ACC contents = null;
+ if (windowState != null) {
+ contents = windowState.get();
}
- TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents);
+ if (contents != null) {
+ TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ emitWindowContents(context.window, contents);
+ }
+ if (triggerResult.isPurge()) {
+ windowState.clear();
+ }
}
- if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, windowState, mergingWindows);
+ if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
+ clearAllState(context.window, windowState, mergingWindows);
+ }
+
+ if (mergingWindows != null) {
+ // need to make sure to update the merging state in state
+ mergingWindows.persist();
}
}
/**
- * Cleans up the window state if the provided {@link TriggerResult} requires so, or if it
- * is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that the
+ * Drops all state for the given window and calls
+ * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
+ *
+ * <p>The caller must ensure that the
* correct key is set in the state backend and the context object.
*/
- private void cleanup(W window,
- AppendingState<IN, ACC> windowState,
- MergingWindowSet<W> mergingWindows) throws Exception {
+ private void clearAllState(
+ W window,
+ AppendingState<IN, ACC> windowState,
+ MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
+ context.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
}
- context.clear();
}
/**
- * Triggers the window computation if the provided {@link TriggerResult} requires so.
- * The caller must ensure that the correct key is set in the state backend and the context object.
+ * Emits the contents of the given window using the {@link InternalWindowFunction}.
*/
@SuppressWarnings("unchecked")
- private void fire(W window, ACC contents) throws Exception {
+ private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
userFunction.apply(context.key, context.window, contents, timestampedCollector);
}
@@ -517,12 +536,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
/**
- * Decides if a window is currently late or not, based on the current
- * watermark, i.e. the current event time, and the allowed lateness.
- * @param window
- * The collection of windows returned by the {@link WindowAssigner}.
- * @return The windows (among the {@code eligibleWindows}) for which the element should still be
- * considered when triggering.
+ * Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness
+ * of the given window.
*/
protected boolean isLate(W window) {
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
@@ -535,6 +550,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
+ if (cleanupTime == Long.MAX_VALUE) {
+ // don't set a GC timer for "end of time"
+ return;
+ }
+
if (windowAssigner.isEventTime()) {
context.registerEventTimeTimer(cleanupTime);
} else {
@@ -549,6 +569,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
protected void deleteCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
+ if (cleanupTime == Long.MAX_VALUE) {
+ // no need to clean up because we didn't set one
+ return;
+ }
if (windowAssigner.isEventTime()) {
context.deleteEventTimeTimer(cleanupTime);
} else {
@@ -566,24 +590,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* @param window the window whose cleanup time we are computing.
*/
private long cleanupTime(W window) {
- long cleanupTime = window.maxTimestamp() + allowedLateness;
- return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+ if (windowAssigner.isEventTime()) {
+ long cleanupTime = window.maxTimestamp() + allowedLateness;
+ return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+ } else {
+ return window.maxTimestamp();
+ }
}
/**
- * Decides if it is time to clean up the window state.
- * Clean up time for a window is:
- * <li> if it is event time, after the watermark passes the end of the window plus the user-specified allowed lateness
- * <li> if it is processing time, after the processing time at the node passes the end of the window.
- * @param window
- * the window to clean
- * @param time
- * the current time (event or processing depending on the {@link WindowAssigner}
- * @return {@code true} if it is time to clean up the window state, {@code false} otherwise.
+ * Returns {@code true} if the given time is the cleanup time for the given window.
*/
protected final boolean isCleanupTime(W window, long time) {
- long cleanupTime = cleanupTime(window);
- return cleanupTime == time;
+ return time == cleanupTime(window);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a9189c20/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 0e2d1e8..e682e2d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1599,20 +1599,21 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
- // dropped as late
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 14600L), 14599));
+
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
- expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 17500L), 17499));
expected.add(new Watermark(20000));
testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
testHarness.close();
}
@@ -1778,7 +1779,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
- expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 13000L), 12999));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
@@ -1786,7 +1787,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
- expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 17500L), 17499));
expected.add(new Watermark(20000));
testHarness.processWatermark(new Watermark(100000));