You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/11 18:29:29 UTC
flink git commit: [hotfix] Add default Trigger for GlobalWindows
Repository: flink
Updated Branches:
refs/heads/master 5dfc897be -> a7a1cbc24
[hotfix] Add default Trigger for GlobalWindows
This also adds notes about possible non-parallelism for windowAll
windows.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7a1cbc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7a1cbc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7a1cbc2
Branch: refs/heads/master
Commit: a7a1cbc24547f1efe1a4569e5638b25c073d34f2
Parents: 5dfc897
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Oct 11 18:21:16 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Oct 11 18:21:58 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 22 +++++++++++++++
.../api/windowing/assigners/GlobalWindows.java | 29 ++++++++++++++++++--
.../EvictingNonKeyedWindowOperator.java | 4 ++-
.../windowing/EvictingWindowOperator.java | 4 ++-
.../windowing/NonKeyedWindowOperator.java | 8 ++++--
.../operators/windowing/WindowOperator.java | 10 ++++---
.../examples/windowing/WindowWordCount.java | 7 +----
.../flink/streaming/api/scala/DataStream.scala | 16 +++++++++++
8 files changed, 83 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 42047b9..00991a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -635,6 +635,12 @@ public class DataStream<T> {
* This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
* {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
* set using
+ *
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
*
* @param size The size of the window.
@@ -652,6 +658,11 @@ public class DataStream<T> {
* set using
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
*
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
* @param size The size of the window.
*/
public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
@@ -661,6 +672,11 @@ public class DataStream<T> {
/**
* Windows this {@code DataStream} into tumbling count windows.
*
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
* @param size The size of the windows in number of elements.
*/
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
@@ -669,6 +685,12 @@ public class DataStream<T> {
/**
* Windows this {@code DataStream} into sliding count windows.
+ *
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
* @param size The size of the windows in number of elements.
* @param slide The slide interval in number of elements.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index dbeb5ce..66c3287 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -29,7 +29,7 @@ import java.util.Collections;
*
* <p>
* Use this if you want to use a {@link Trigger} and
- * {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to to flexible, policy based
+ * {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based
* windows.
*/
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
@@ -44,7 +44,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
@Override
public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
- return null;
+ return new NeverTrigger();
}
@Override
@@ -61,4 +61,29 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
public static GlobalWindows create() {
return new GlobalWindows();
}
+
+ /**
+ * A trigger that never fires, as default Trigger for GlobalWindows.
+ */
+ private static class NeverTrigger implements Trigger<Object, GlobalWindow> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TriggerResult onElement(Object element,
+ long timestamp,
+ GlobalWindow window,
+ TriggerContext ctx) {
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public Trigger<Object, GlobalWindow> duplicate() {
+ return this;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 31c7fed..bd3572e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -30,6 +30,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.Objects.requireNonNull;
+
/**
* Evicting window operator for non-keyed windows.
*
@@ -53,7 +55,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor) {
super(windowAssigner, windowBufferFactory, windowFunction, trigger);
- this.evictor = evictor;
+ this.evictor = requireNonNull(evictor);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 49d58e4..51413bd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+import static java.util.Objects.requireNonNull;
+
/**
* A {@link WindowOperator} that also allows an {@link Evictor} to be used.
*
@@ -60,7 +62,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor) {
super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger);
- this.evictor = evictor;
+ this.evictor = requireNonNull(evictor);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index e6aa53b..f35ffca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -46,6 +46,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import static java.util.Objects.requireNonNull;
+
/**
* Window operator for non-keyed windows.
*
@@ -87,10 +89,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
super(windowFunction);
- this.windowAssigner = windowAssigner;
+ this.windowAssigner = requireNonNull(windowAssigner);
- this.windowBufferFactory = windowBufferFactory;
- this.triggerTemplate = trigger;
+ this.windowBufferFactory = requireNonNull(windowBufferFactory);
+ this.triggerTemplate = requireNonNull(trigger);
setChainingStrategy(ChainingStrategy.ALWAYS);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 7762101..da36db1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -47,6 +47,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import static java.util.Objects.requireNonNull;
+
/**
* An operator that implements the logic for windowing based on a {@link WindowAssigner} and
* {@link Trigger}.
@@ -130,11 +132,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
super(windowFunction);
- this.windowAssigner = windowAssigner;
- this.keySelector = keySelector;
+ this.windowAssigner = requireNonNull(windowAssigner);
+ this.keySelector = requireNonNull(keySelector);
- this.windowBufferFactory = windowBufferFactory;
- this.triggerTemplate = trigger;
+ this.windowBufferFactory = requireNonNull(windowBufferFactory);
+ this.triggerTemplate = requireNonNull(trigger);
setChainingStrategy(ChainingStrategy.ALWAYS);
// forceInputCopy();
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index 04352d8..f3d57bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -21,9 +21,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.examples.wordcount.WordCount;
/**
@@ -73,9 +70,7 @@ public class WindowWordCount {
text.flatMap(new WordCount.Tokenizer())
// create windows of windowSize records slided every slideSize records
.keyBy(0)
- .window(GlobalWindows.create())
- .evictor(CountEvictor.of(windowSize))
- .trigger(CountTrigger.of(slideSize))
+ .countWindow(windowSize, slideSize)
// group by the tuple field "0" and sum up tuple field "1"
.sum(1);
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 47dbf50..0565f52 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -587,6 +587,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
* set using
* [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
*
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
* @param size The size of the window.
*/
def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
@@ -602,6 +606,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
* set using
* [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
*
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
* @param size The size of the window.
*/
def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = {
@@ -612,6 +620,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Windows this [[DataStream]] into sliding count windows.
*
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
* @param size The size of the windows in number of elements.
* @param slide The slide interval in number of elements.
*/
@@ -622,6 +634,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Windows this [[DataStream]] into tumbling count windows.
*
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
* @param size The size of the windows in number of elements.
*/
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {