You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/21 10:17:19 UTC
[2/6] flink git commit: [FLINK-3714] Add Support for "Allowed
Lateness"
[FLINK-3714] Add Support for "Allowed Lateness"
Handle late elements and take care
of cleaning the window state.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34a8b03d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34a8b03d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34a8b03d
Branch: refs/heads/master
Commit: 34a8b03d2ad40db7dc00fa47923b96374c289838
Parents: f2e9c52
Author: kl0u <kk...@gmail.com>
Authored: Tue May 31 17:13:58 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 21 12:14:06 2016 +0200
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 59 +-
.../api/datastream/WindowedStream.java | 53 +-
.../assigners/EventTimeSessionWindows.java | 5 +
.../api/windowing/assigners/GlobalWindows.java | 5 +
.../assigners/ProcessingTimeSessionWindows.java | 5 +
.../assigners/SlidingEventTimeWindows.java | 5 +
.../assigners/SlidingProcessingTimeWindows.java | 5 +
.../assigners/TumblingEventTimeWindows.java | 5 +
.../TumblingProcessingTimeWindows.java | 5 +
.../api/windowing/assigners/WindowAssigner.java | 6 +
.../api/windowing/evictors/CountEvictor.java | 2 +-
.../windowing/triggers/EventTimeTrigger.java | 9 +-
.../windowing/EvictingWindowOperator.java | 218 +++--
.../operators/windowing/WindowOperator.java | 327 ++++---
.../windowing/EvictingWindowOperatorTest.java | 75 +-
.../operators/windowing/WindowOperatorTest.java | 873 ++++++++++++++++++-
.../streaming/api/scala/AllWindowedStream.scala | 17 +-
.../streaming/api/scala/WindowedStream.scala | 15 +
18 files changed, 1480 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 5a1b56d..e5dacc0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
@@ -87,6 +88,8 @@ public class AllWindowedStream<T, W extends Window> {
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
+ /** The user-specified allowed lateness. */
+ private long allowedLateness = Long.MAX_VALUE;
@PublicEvolving
public AllWindowedStream(DataStream<T> input,
@@ -110,6 +113,26 @@ public class AllWindowedStream<T, W extends Window> {
}
/**
+ * Sets the allowed lateness to a user-specified value.
+ * If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
+ * Setting the allowed lateness is only valid for event-time windows.
+ * If a value different than 0 is provided with a processing-time
+ * {@link WindowAssigner}, then an exception is thrown.
+ */
+ @PublicEvolving
+ public AllWindowedStream<T, W> allowedLateness(Time lateness) {
+ long millis = lateness.toMilliseconds();
+ if (allowedLateness < 0) {
+ throw new IllegalArgumentException("The allowed lateness cannot be negative.");
+ } else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
+ throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
+ } else {
+ this.allowedLateness = millis;
+ }
+ return this;
+ }
+
+ /**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
* <p>
@@ -251,14 +274,16 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
- operator = new EvictingWindowOperator<>(windowAssigner,
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableAllWindowFunction<>(function),
trigger,
- evictor);
+ evictor,
+ allowedLateness);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
@@ -266,13 +291,15 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
- operator = new WindowOperator<>(windowAssigner,
+ operator =
+ new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableAllWindowFunction<>(function),
- trigger);
+ trigger,
+ allowedLateness);
}
return input.transform(opName, resultType, operator).setParallelism(1);
@@ -335,14 +362,16 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
- operator = new EvictingWindowOperator<>(windowAssigner,
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
trigger,
- evictor);
+ evictor,
+ allowedLateness);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -351,13 +380,15 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
- operator = new WindowOperator<>(windowAssigner,
+ operator =
+ new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
- trigger);
+ trigger,
+ allowedLateness);
}
return input.transform(opName, resultType, operator).setParallelism(1);
@@ -425,14 +456,16 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
- operator = new EvictingWindowOperator<>(windowAssigner,
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function)),
trigger,
- evictor);
+ evictor,
+ allowedLateness);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -442,13 +475,15 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
- operator = new WindowOperator<>(windowAssigner,
+ operator =
+ new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
- trigger);
+ trigger,
+ allowedLateness);
}
return input.transform(opName, resultType, operator).setParallelism(1);
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 84290b2..6110480 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -98,6 +99,8 @@ public class WindowedStream<T, K, W extends Window> {
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
+ /** The user-specified allowed lateness. */
+ private long allowedLateness = Long.MAX_VALUE;
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input,
@@ -121,6 +124,26 @@ public class WindowedStream<T, K, W extends Window> {
}
/**
+ * Sets the allowed lateness to a user-specified value.
+ * If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
+ * Setting the allowed lateness is only valid for event-time windows.
+ * If a value different than 0 is provided with a processing-time
+ * {@link WindowAssigner}, then an exception is thrown.
+ */
+ @PublicEvolving
+ public WindowedStream<T, K, W> allowedLateness(Time lateness) {
+ long millis = lateness.toMilliseconds();
+ if (allowedLateness < 0) {
+ throw new IllegalArgumentException("The allowed lateness cannot be negative.");
+ } else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
+ throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
+ } else {
+ this.allowedLateness = millis;
+ }
+ return this;
+ }
+
+ /**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
* <p>
@@ -272,14 +295,16 @@ public class WindowedStream<T, K, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
- operator = new EvictingWindowOperator<>(windowAssigner,
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(function),
trigger,
- evictor);
+ evictor,
+ allowedLateness);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
@@ -287,13 +312,15 @@ public class WindowedStream<T, K, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
- operator = new WindowOperator<>(windowAssigner,
+ operator =
+ new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(function),
- trigger);
+ trigger,
+ allowedLateness);
}
return input.transform(opName, resultType, operator);
@@ -356,14 +383,16 @@ public class WindowedStream<T, K, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
- operator = new EvictingWindowOperator<>(windowAssigner,
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
- evictor);
+ evictor,
+ allowedLateness);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -372,13 +401,15 @@ public class WindowedStream<T, K, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
- operator = new WindowOperator<>(windowAssigner,
+ operator =
+ new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(function),
- trigger);
+ trigger,
+ allowedLateness);
}
return input.transform(opName, resultType, operator);
@@ -453,7 +484,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)),
trigger,
- evictor);
+ evictor,
+ allowedLateness);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -469,7 +501,8 @@ public class WindowedStream<T, K, W extends Window> {
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(function),
- trigger);
+ trigger,
+ allowedLateness);
}
return input.transform(opName, resultType, operator);
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
index ed5add5..64c14cd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -81,6 +81,11 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW
return new TimeWindow.Serializer();
}
+ @Override
+ public boolean isEventTime() {
+ return true;
+ }
+
/**
* Merge overlapping {@link TimeWindow}s.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index dcf440c..71101f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -102,4 +102,9 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new GlobalWindow.Serializer();
}
+
+ @Override
+ public boolean isEventTime() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
index 608ebbc..0e1682d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -81,6 +81,11 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object,
return new TimeWindow.Serializer();
}
+ @Override
+ public boolean isEventTime() {
+ return false;
+ }
+
/**
* Merge overlapping {@link TimeWindow}s.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 89b216e..83511df 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -109,4 +109,9 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
+
+ @Override
+ public boolean isEventTime() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 4b91986..d2b0707 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -101,4 +101,9 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
+
+ @Override
+ public boolean isEventTime() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 1f61281..70432a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -95,4 +95,9 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
+
+ @Override
+ public boolean isEventTime() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 436a9ed..3ec55d0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -86,4 +86,9 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
+
+ @Override
+ public boolean isEventTime() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 683ed42..0b49bce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -63,4 +63,10 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
* this {@code WindowAssigner}.
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
+
+ /**
+ * Returns {@code true} if elements are assigned to windows based on event time,
+ * {@code false} otherwise.
+ */
+ public abstract boolean isEventTime();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
index e989cbc..dc82521 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
- * An {@link Evictor} that keeps only a certain amount of elements.
+ * An {@link Evictor} that keeps up to a certain amount of elements.
*
* @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 75c6a9d..a87e436 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -36,12 +36,17 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ctx.registerEventTimeTimer(window.maxTimestamp());
- return TriggerResult.CONTINUE;
+
+ return (window.maxTimestamp() <= ctx.getCurrentWatermark()) ?
+ TriggerResult.FIRE_AND_PURGE :
+ TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
- return TriggerResult.FIRE_AND_PURGE;
+ return time == window.maxTimestamp() ?
+ TriggerResult.FIRE_AND_PURGE :
+ TriggerResult.CONTINUE;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 84ee0b9..fa1c894 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
@@ -70,8 +71,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
- Evictor<? super IN, ? super W> evictor) {
- super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger);
+ Evictor<? super IN, ? super W> evictor,
+ long allowedLateness) {
+
+ super(windowAssigner, windowSerializer, keySelector,
+ keySerializer, null, windowFunction, trigger, allowedLateness);
this.evictor = requireNonNull(evictor);
this.windowStateDescriptor = windowStateDescriptor;
}
@@ -79,8 +83,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
- Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(),
- element.getTimestamp());
+
+ Collection<W> elementWindows = windowAssigner.assignWindows(
+ element.getValue(),
+ element.getTimestamp());
final K key = (K) getStateBackend().getCurrentKey();
@@ -112,20 +118,27 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
for (W m : mergedWindows) {
context.window = m;
context.clear();
+ deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
- getStateBackend().mergePartitionedStates(stateWindowResult,
- mergedStateWindows,
- windowSerializer,
- (StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
+ getStateBackend().mergePartitionedStates(
+ stateWindowResult,
+ mergedStateWindows,
+ windowSerializer,
+ (StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
}
});
+ // check if the window is already inactive
+ if (isLate(actualWindow)) {
+ LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness.");
+ continue;
+ }
+
W stateWindow = mergingWindows.getStateWindow(actualWindow);
- ListState<StreamRecord<IN>> windowState = getPartitionedState(stateWindow,
- windowSerializer,
- windowStateDescriptor);
+ ListState<StreamRecord<IN>> windowState = getPartitionedState(
+ stateWindow, windowSerializer, windowStateDescriptor);
windowState.add(element);
context.key = key;
@@ -134,77 +147,166 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// we might have already fired because of a merge but still call onElement
// on the (possibly merged) window
TriggerResult triggerResult = context.onElement(element);
-
- TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult,
- mergeTriggerResult.f0);
-
- processTriggerResult(combinedTriggerResult, actualWindow);
+ TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
+ fireOrContinue(combinedTriggerResult, actualWindow, windowState);
+
+ if (combinedTriggerResult.isPurge()) {
+ cleanup(actualWindow, windowState, mergingWindows);
+ } else {
+ registerCleanupTimer(actualWindow);
+ }
}
} else {
for (W window : elementWindows) {
- ListState<StreamRecord<IN>> windowState = getPartitionedState(window,
- windowSerializer,
- windowStateDescriptor);
+ // check if the window is already inactive
+ if (isLate(window)) {
+ LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
+ continue;
+ }
+ ListState<StreamRecord<IN>> windowState = getPartitionedState(
+ window, windowSerializer, windowStateDescriptor);
windowState.add(element);
context.key = key;
context.window = window;
+
TriggerResult triggerResult = context.onElement(element);
+ fireOrContinue(triggerResult, window, windowState);
- processTriggerResult(triggerResult, window);
+ if (triggerResult.isPurge()) {
+ cleanup(window, windowState, null);
+ } else {
+ registerCleanupTimer(window);
+ }
}
}
}
@Override
- @SuppressWarnings("unchecked,rawtypes")
- protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
- if (!triggerResult.isFire() && !triggerResult.isPurge()) {
- // do nothing
- return;
- }
+ public void processWatermark(Watermark mark) throws Exception {
+ boolean fire;
+ do {
+ Timer<K, W> timer = watermarkTimersQueue.peek();
+ if (timer != null && timer.timestamp <= mark.getTimestamp()) {
+ fire = true;
+
+ watermarkTimers.remove(timer);
+ watermarkTimersQueue.remove();
+
+ context.key = timer.key;
+ context.window = timer.window;
+ setKeyContext(timer.key);
+
+ ListState<StreamRecord<IN>> windowState;
+ MergingWindowSet<W> mergingWindows = null;
+
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow = mergingWindows.getStateWindow(context.window);
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ }
+
+ TriggerResult triggerResult = context.onEventTime(timer.timestamp);
+ fireOrContinue(triggerResult, context.window, windowState);
+
+ if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
+ cleanup(timer.window, windowState, mergingWindows);
+ }
+
+ } else {
+ fire = false;
+ }
+ } while (fire);
- ListState<StreamRecord<IN>> windowState;
+ output.emitWatermark(mark);
- MergingWindowSet<W> mergingWindows = null;
+ this.currentWatermark = mark.getTimestamp();
+ }
- if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(window);
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ @Override
+ public void trigger(long time) throws Exception {
+ boolean fire;
- } else {
- windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
- }
+ //Remove information about the triggering task
+ processingTimeTimerFutures.remove(time);
+ processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));
- if (triggerResult.isFire()) {
- timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
- Iterable<StreamRecord<IN>> contents = windowState.get();
-
- // Work around type system restrictions...
- int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
-
- FluentIterable<IN> projectedContents = FluentIterable
- .from(contents)
- .skip(toEvict)
- .transform(new Function<StreamRecord<IN>, IN>() {
- @Override
- public IN apply(StreamRecord<IN> input) {
- return input.getValue();
- }
- });
- userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
- }
- if (triggerResult.isPurge()) {
- windowState.clear();
- if (mergingWindows != null) {
- mergingWindows.retireWindow(window);
+ do {
+ Timer<K, W> timer = processingTimeTimersQueue.peek();
+ if (timer != null && timer.timestamp <= time) {
+ fire = true;
+
+ processingTimeTimers.remove(timer);
+ processingTimeTimersQueue.remove();
+
+ context.key = timer.key;
+ context.window = timer.window;
+ setKeyContext(timer.key);
+
+ ListState<StreamRecord<IN>> windowState;
+ MergingWindowSet<W> mergingWindows = null;
+
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow = mergingWindows.getStateWindow(context.window);
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ }
+
+ TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
+ fireOrContinue(triggerResult, context.window, windowState);
+
+ if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
+ cleanup(timer.window, windowState, mergingWindows);
+ }
+
+ } else {
+ fire = false;
}
- context.clear();
+ } while (fire);
+ }
+
+ private void fireOrContinue(TriggerResult triggerResult,
+ W window,
+ ListState<StreamRecord<IN>> windowState) throws Exception {
+ if (!triggerResult.isFire()) {
+ return;
+ }
+
+ timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+ Iterable<StreamRecord<IN>> contents = windowState.get();
+
+ // Work around type system restrictions...
+ int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
+
+ FluentIterable<IN> projectedContents = FluentIterable
+ .from(contents)
+ .skip(toEvict)
+ .transform(new Function<StreamRecord<IN>, IN>() {
+ @Override
+ public IN apply(StreamRecord<IN> input) {
+ return input.getValue();
+ }
+ });
+ userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
+ }
+
+ private void cleanup(W window,
+ ListState<StreamRecord<IN>> windowState,
+ MergingWindowSet<W> mergingWindows) throws Exception {
+
+ windowState.clear();
+ if (mergingWindows != null) {
+ mergingWindows.retireWindow(window);
}
+ context.clear();
+ deleteCleanupTimer(window);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 18020b3..95ad1b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -78,9 +79,9 @@ import static java.util.Objects.requireNonNull;
* {@link Trigger}.
*
* <p>
- * When an element arrives it gets assigned a key using a {@link KeySelector} and it get's
+ * When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
* assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
- * is put into panes. A pane is the bucket of elements that have the same key and same
+ * is put into panes. A pane is the bucket of elements that have the same key and belong to the same
* {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
* {@code WindowAssigner}.
*
@@ -130,6 +131,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
protected final TypeSerializer<W> windowSerializer;
+ /**
+ * The allowed lateness for elements. This is used for:
+ * <ul>
+ * <li>Deciding if an element should be dropped from a window due to lateness.
+ * <li>Clearing the state of a window if the system time passes the
+ * {@code window.maxTimestamp + allowedLateness} landmark.
+ * </ul>
+ */
+ protected final long allowedLateness;
+
// ------------------------------------------------------------------------
// State that is not checkpointed
// ------------------------------------------------------------------------
@@ -177,7 +188,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
TypeSerializer<K> keySerializer,
StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
InternalWindowFunction<ACC, OUT, K, W> windowFunction,
- Trigger<? super IN, ? super W> trigger) {
+ Trigger<? super IN, ? super W> trigger,
+ long allowedLateness) {
super(windowFunction);
@@ -189,6 +201,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
this.windowStateDescriptor = windowStateDescriptor;
this.trigger = requireNonNull(trigger);
+ Preconditions.checkArgument(allowedLateness >= 0);
+ this.allowedLateness = allowedLateness;
+
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -264,7 +279,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
- Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+
+ Collection<W> elementWindows = windowAssigner.assignWindows(
+ element.getValue(),
+ element.getTimestamp());
final K key = (K) getStateBackend().getCurrentKey();
@@ -276,7 +294,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// element because we always eagerly merge
final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);
-
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
@@ -294,18 +311,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
for (W m: mergedWindows) {
context.window = m;
context.clear();
+ deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
- getStateBackend().mergePartitionedStates(stateWindowResult,
- mergedStateWindows,
- windowSerializer,
- (StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
+ getStateBackend().mergePartitionedStates(
+ stateWindowResult,
+ mergedStateWindows,
+ windowSerializer,
+ (StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
}
});
+ // check if the window is already inactive
+ if (isLate(actualWindow)) {
+ LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness.");
+ continue;
+ }
+
W stateWindow = mergingWindows.getStateWindow(actualWindow);
- AppendingState<IN, ACC> windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ AppendingState<IN, ACC> windowState = getPartitionedState(
+ stateWindow, windowSerializer, windowStateDescriptor);
windowState.add(element.getValue());
context.key = key;
@@ -314,104 +340,46 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// we might have already fired because of a merge but still call onElement
// on the (possibly merged) window
TriggerResult triggerResult = context.onElement(element);
-
TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
+ fireOrContinue(combinedTriggerResult, actualWindow, windowState);
- processTriggerResult(combinedTriggerResult, actualWindow);
+ if (combinedTriggerResult.isPurge()) {
+ cleanup(actualWindow, windowState, mergingWindows);
+ } else {
+ registerCleanupTimer(actualWindow);
+ }
}
-
} else {
for (W window: elementWindows) {
- AppendingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
- windowStateDescriptor);
+ // check if the window is already inactive
+ if (isLate(window)) {
+ LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
+ continue;
+ }
+ AppendingState<IN, ACC> windowState = getPartitionedState(
+ window, windowSerializer, windowStateDescriptor);
windowState.add(element.getValue());
context.key = key;
context.window = window;
- TriggerResult triggerResult = context.onElement(element);
-
- processTriggerResult(triggerResult, window);
- }
- }
- }
-
- /**
- * Retrieves the {@link MergingWindowSet} for the currently active key. The caller must
- * ensure that the correct key is set in the state backend.
- */
- @SuppressWarnings("unchecked")
- protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
- MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
- if (mergingWindows == null) {
- // try to retrieve from state
-
- TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
- ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
- ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
-
- mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
- mergeState.clear();
-
- mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
- }
- return mergingWindows;
- }
-
-
- /**
- * Process {@link TriggerResult} for the currently active key and the given window. The caller
- * must ensure that the correct key is set in the state backend and the context object.
- */
- @SuppressWarnings("unchecked")
- protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
- if (!triggerResult.isFire() && !triggerResult.isPurge()) {
- // do nothing
- return;
- }
-
- AppendingState<IN, ACC> windowState;
-
- MergingWindowSet<W> mergingWindows = null;
-
- if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(window);
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-
- } else {
- windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
- }
-
- if (triggerResult.isFire()) {
- timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
- ACC contents = windowState.get();
- userFunction.apply(context.key, context.window, contents, timestampedCollector);
+ TriggerResult triggerResult = context.onElement(element);
+ fireOrContinue(triggerResult, window, windowState);
- }
- if (triggerResult.isPurge()) {
- windowState.clear();
- if (mergingWindows != null) {
- mergingWindows.retireWindow(window);
+ if (triggerResult.isPurge()) {
+ cleanup(window, windowState, null);
+ } else {
+ registerCleanupTimer(window);
+ }
}
- context.clear();
}
}
@Override
- public final void processWatermark(Watermark mark) throws Exception {
- processTriggersFor(mark);
-
- output.emitWatermark(mark);
-
- this.currentWatermark = mark.getTimestamp();
- }
-
- private void processTriggersFor(Watermark mark) throws Exception {
+ public void processWatermark(Watermark mark) throws Exception {
boolean fire;
-
do {
Timer<K, W> timer = watermarkTimersQueue.peek();
if (timer != null && timer.timestamp <= mark.getTimestamp()) {
@@ -423,16 +391,37 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
+
+ AppendingState<IN, ACC> windowState;
+ MergingWindowSet<W> mergingWindows = null;
+
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow = mergingWindows.getStateWindow(context.window);
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ }
+
TriggerResult triggerResult = context.onEventTime(timer.timestamp);
- processTriggerResult(triggerResult, context.window);
+ fireOrContinue(triggerResult, context.window, windowState);
+
+ if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
+ cleanup(timer.window, windowState, mergingWindows);
+ }
+
} else {
fire = false;
}
} while (fire);
+
+ output.emitWatermark(mark);
+
+ this.currentWatermark = mark.getTimestamp();
}
@Override
- public final void trigger(long time) throws Exception {
+ public void trigger(long time) throws Exception {
boolean fire;
//Remove information about the triggering task
@@ -450,17 +439,154 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
+
+ AppendingState<IN, ACC> windowState;
+ MergingWindowSet<W> mergingWindows = null;
+
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow = mergingWindows.getStateWindow(context.window);
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ }
+
TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
- processTriggerResult(triggerResult, context.window);
+ fireOrContinue(triggerResult, context.window, windowState);
+
+ if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
+ cleanup(timer.window, windowState, mergingWindows);
+ }
+
} else {
fire = false;
}
} while (fire);
+ }
+
+ /**
+ * Cleans up the window state if the provided {@link TriggerResult} requires so, or if it
+ * is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that the
+ * correct key is set in the state backend and the context object.
+ */
+ private void cleanup(W window,
+ AppendingState<IN, ACC> windowState,
+ MergingWindowSet<W> mergingWindows) throws Exception {
+ windowState.clear();
+ if (mergingWindows != null) {
+ mergingWindows.retireWindow(window);
+ }
+ context.clear();
+ deleteCleanupTimer(window);
+ }
+
+ /**
+ * Triggers the window computation if the provided {@link TriggerResult} requires so.
+ * The caller must ensure that the correct key is set in the state backend and the context object.
+ */
+ @SuppressWarnings("unchecked")
+ private void fireOrContinue(TriggerResult triggerResult,
+ W window,
+ AppendingState<IN, ACC> windowState) throws Exception {
+ if (!triggerResult.isFire()) {
+ return;
+ }
+
+ timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+ ACC contents = windowState.get();
+ userFunction.apply(context.key, context.window, contents, timestampedCollector);
+ }
+
+ /**
+ * Retrieves the {@link MergingWindowSet} for the currently active key.
+ * The caller must ensure that the correct key is set in the state backend.
+ */
+ @SuppressWarnings("unchecked")
+ protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
+ MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
+ if (mergingWindows == null) {
+ // try to retrieve from state
+
+ TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
+ ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+ ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+
+ mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
+ mergeState.clear();
+
+ mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
+ }
+ return mergingWindows;
+ }
- // Also check any watermark timers. We might have some in here since
- // Context.registerEventTimeTimer sets a trigger if an event-time trigger is registered
- // that is already behind the watermark.
- processTriggersFor(new Watermark(currentWatermark));
+ /**
+ * Decides if a window is currently late or not, based on the current
+ * watermark, i.e. the current event time, and the allowed lateness.
+ * @param window
+ * The collection of windows returned by the {@link WindowAssigner}.
+ * @return The windows (among the {@code eligibleWindows}) for which the element should still be
+ * considered when triggering.
+ */
+ protected boolean isLate(W window) {
+ return (windowAssigner.isEventTime() && (getCleanupTimeForWindow(window) <= currentWatermark));
+ }
+
+ /**
+ * Registers a timer to cleanup the content of the window.
+ * @param window
+ * the window whose state to discard
+ */
+ protected void registerCleanupTimer(W window) {
+ long cleanupTime = getCleanupTimeForWindow(window);
+ if (windowAssigner.isEventTime()) {
+ context.registerEventTimeTimer(cleanupTime);
+ } else {
+ context.registerProcessingTimeTimer(cleanupTime);
+ }
+ }
+
+ /**
+ * Deletes the cleanup timer set for the contents of the provided window.
+ * @param window
+ * the window whose state to discard
+ */
+ protected void deleteCleanupTimer(W window) {
+ long cleanupTime = getCleanupTimeForWindow(window);
+ if (windowAssigner.isEventTime()) {
+ context.deleteEventTimeTimer(cleanupTime);
+ } else {
+ context.deleteProcessingTimeTimer(cleanupTime);
+ }
+ }
+
+ /**
+ * Returns the cleanup time for a window, which is
+ * {@code window.maxTimestamp + allowedLateness}. In
+ * case this leads to a value greated than {@link Long#MAX_VALUE}
+ * then a cleanup time of {@link Long#MAX_VALUE} is
+ * returned.
+ *
+ * @param window the window whose cleanup time we are computing.
+ */
+ private long getCleanupTimeForWindow(W window) {
+ long cleanupTime = window.maxTimestamp() + allowedLateness;
+ return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+ }
+
+ /**
+ * Decides if it is time to clean up the window state.
+ * Clean up time for a window is:
+ * <li> if it is event time, after the watermark passes the end of the window plus the user-specified allowed lateness
+ * <li> if it is processing time, after the processing time at the node passes the end of the window.
+ * @param window
+ * the window to clean
+ * @param time
+ * the current time (event or processing depending on the {@link WindowAssigner}
+ * @return {@code true} if it is time to clean up the window state, {@code false} otherwise.
+ */
+ protected final boolean isCleanupTime(W window, long time) {
+ long cleanupTime = getCleanupTimeForWindow(window);
+ return cleanupTime == time;
}
/**
@@ -562,13 +688,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (watermarkTimers.add(timer)) {
watermarkTimersQueue.add(timer);
}
-
- if (time <= currentWatermark) {
- // immediately schedule a trigger, so that we don't wait for the next
- // watermark update to fire the watermark trigger
- getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this);
- //No need to put it in processingTimeTimerFutures as this timer is never removed
- }
}
@Override
@@ -594,7 +713,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (watermarkTimers.remove(timer)) {
watermarkTimersQueue.remove(timer);
}
-
}
public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
@@ -627,7 +745,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
}
-
/**
* Internal class for keeping track of in-flight timers.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 6af7ac4..dc71440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -30,9 +30,13 @@ import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFu
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -45,6 +49,7 @@ import org.junit.Test;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class EvictingWindowOperatorTest {
@@ -73,7 +78,8 @@ public class EvictingWindowOperatorTest {
stateDesc,
new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())),
CountTrigger.of(WINDOW_SLIDE),
- CountEvictor.of(WINDOW_SIZE));
+ CountEvictor.of(WINDOW_SIZE),
+ 0);
operator.setInputType(inputType, new ExecutionConfig());
@@ -144,7 +150,8 @@ public class EvictingWindowOperatorTest {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(WINDOW_SLIDE),
- CountEvictor.of(WINDOW_SIZE));
+ CountEvictor.of(WINDOW_SIZE),
+ 0);
operator.setInputType(inputType, new ExecutionConfig());
@@ -194,7 +201,69 @@ public class EvictingWindowOperatorTest {
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
- // ------------------------------------------------------------------------
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTumblingWindowWithApply() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+
+ final int WINDOW_SIZE = 4;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents",
+ new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig())));
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
+ TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
+ EventTimeTrigger.create(),
+ CountEvictor.of(WINDOW_SIZE),
+ 0);
+
+ operator.setInputType(inputType, new ExecutionConfig());
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+
+ testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ long initialTime = 0L;
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 100));
+
+ testHarness.processWatermark(new Watermark(1999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 1997));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 1998));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 2310)); // not late but more than 4
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 2310));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2310));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2310));
+
+ testHarness.processWatermark(new Watermark(3999)); // now is the evictor
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new Watermark(1999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 3999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
+ expectedOutput.add(new Watermark(3999));
+
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
+ new EvictingWindowOperatorTest.ResultSortComparator());
+ testHarness.close();
+ }
+
+ // ------------------------------------------------------------------------
// UDFs
// ------------------------------------------------------------------------