You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/11 18:29:29 UTC

flink git commit: [hotfix] Add default Trigger for GlobalWindows

Repository: flink
Updated Branches:
  refs/heads/master 5dfc897be -> a7a1cbc24


[hotfix] Add default Trigger for GlobalWindows

This also adds notes about possible non-parallelism for windowAll
windows.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7a1cbc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7a1cbc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7a1cbc2

Branch: refs/heads/master
Commit: a7a1cbc24547f1efe1a4569e5638b25c073d34f2
Parents: 5dfc897
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Oct 11 18:21:16 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Oct 11 18:21:58 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 22 +++++++++++++++
 .../api/windowing/assigners/GlobalWindows.java  | 29 ++++++++++++++++++--
 .../EvictingNonKeyedWindowOperator.java         |  4 ++-
 .../windowing/EvictingWindowOperator.java       |  4 ++-
 .../windowing/NonKeyedWindowOperator.java       |  8 ++++--
 .../operators/windowing/WindowOperator.java     | 10 ++++---
 .../examples/windowing/WindowWordCount.java     |  7 +----
 .../flink/streaming/api/scala/DataStream.scala  | 16 +++++++++++
 8 files changed, 83 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 42047b9..00991a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -635,6 +635,12 @@ public class DataStream<T> {
 	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
 	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
 	 * set using
+	 *
+	 * <p>
+	 * Note: This operation can be inherently non-parallel since all elements have to pass through
+	 * the same operator instance. (Only for special cases, such as aligned time windows is
+	 * it possible to perform this operation in parallel).
+	 *
 	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
 	 *
 	 * @param size The size of the window.
@@ -652,6 +658,11 @@ public class DataStream<T> {
 	 * set using
 	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
 	 *
+	 * <p>
+	 * Note: This operation can be inherently non-parallel since all elements have to pass through
+	 * the same operator instance. (Only for special cases, such as aligned time windows is
+	 * it possible to perform this operation in parallel).
+	 *
 	 * @param size The size of the window.
 	 */
 	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
@@ -661,6 +672,11 @@ public class DataStream<T> {
 	/**
 	 * Windows this {@code DataStream} into tumbling count windows.
 	 *
+	 * <p>
+	 * Note: This operation can be inherently non-parallel since all elements have to pass through
+	 * the same operator instance. (Only for special cases, such as aligned time windows is
+	 * it possible to perform this operation in parallel).
+	 *
 	 * @param size The size of the windows in number of elements.
 	 */
 	public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
@@ -669,6 +685,12 @@ public class DataStream<T> {
 
 	/**
 	 * Windows this {@code DataStream} into sliding count windows.
+	 *
+	 * <p>
+	 * Note: This operation can be inherently non-parallel since all elements have to pass through
+	 * the same operator instance. (Only for special cases, such as aligned time windows is
+	 * it possible to perform this operation in parallel).
+	 *
 	 * @param size The size of the windows in number of elements.
 	 * @param slide The slide interval in number of elements.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index dbeb5ce..66c3287 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -29,7 +29,7 @@ import java.util.Collections;
  *
  * <p>
  * Use this if you want to use a {@link Trigger} and
- * {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to to flexible, policy based
+ * {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based
  * windows.
  */
 public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
@@ -44,7 +44,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 
 	@Override
 	public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
-		return null;
+		return new NeverTrigger();
 	}
 
 	@Override
@@ -61,4 +61,29 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 	public static GlobalWindows create() {
 		return new GlobalWindows();
 	}
+
+	/**
+	 * A trigger that never fires, as default Trigger for GlobalWindows.
+	 */
+	private static class NeverTrigger implements Trigger<Object, GlobalWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public TriggerResult onElement(Object element,
+				long timestamp,
+				GlobalWindow window,
+				TriggerContext ctx) {
+				return TriggerResult.CONTINUE;
+		}
+
+		@Override
+		public TriggerResult onTime(long time, TriggerContext ctx) {
+			return TriggerResult.CONTINUE;
+		}
+
+		@Override
+		public Trigger<Object, GlobalWindow> duplicate() {
+			return this;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 31c7fed..bd3572e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -30,6 +30,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Evicting window operator for non-keyed windows.
  *
@@ -53,7 +55,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
 		super(windowAssigner, windowBufferFactory, windowFunction, trigger);
-		this.evictor = evictor;
+		this.evictor = requireNonNull(evictor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 49d58e4..51413bd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
  *
@@ -60,7 +62,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
 		super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger);
-		this.evictor = evictor;
+		this.evictor = requireNonNull(evictor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index e6aa53b..f35ffca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -46,6 +46,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Window operator for non-keyed windows.
  *
@@ -87,10 +89,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		super(windowFunction);
 
-		this.windowAssigner = windowAssigner;
+		this.windowAssigner = requireNonNull(windowAssigner);
 
-		this.windowBufferFactory = windowBufferFactory;
-		this.triggerTemplate = trigger;
+		this.windowBufferFactory = requireNonNull(windowBufferFactory);
+		this.triggerTemplate = requireNonNull(trigger);
 
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 7762101..da36db1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -47,6 +47,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
  * {@link Trigger}.
@@ -130,11 +132,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		super(windowFunction);
 
-		this.windowAssigner = windowAssigner;
-		this.keySelector = keySelector;
+		this.windowAssigner = requireNonNull(windowAssigner);
+		this.keySelector = requireNonNull(keySelector);
 
-		this.windowBufferFactory = windowBufferFactory;
-		this.triggerTemplate = trigger;
+		this.windowBufferFactory = requireNonNull(windowBufferFactory);
+		this.triggerTemplate = requireNonNull(trigger);
 
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 //		forceInputCopy();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index 04352d8..f3d57bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -21,9 +21,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.examples.wordcount.WordCount;
 
 /**
@@ -73,9 +70,7 @@ public class WindowWordCount {
 		text.flatMap(new WordCount.Tokenizer())
 				// create windows of windowSize records slided every slideSize records
 				.keyBy(0)
-				.window(GlobalWindows.create())
-				.evictor(CountEvictor.of(windowSize))
-				.trigger(CountTrigger.of(slideSize))
+				.countWindow(windowSize, slideSize)
 				// group by the tuple field "0" and sum up tuple field "1"
 				.sum(1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a1cbc2/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 47dbf50..0565f52 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -587,6 +587,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * set using
    * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
    *
+   * Note: This operation can be inherently non-parallel since all elements have to pass through
+   * the same operator instance. (Only for special cases, such as aligned time windows is
+   * it possible to perform this operation in parallel).
+   *
    * @param size The size of the window.
    */
   def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
@@ -602,6 +606,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * set using
    * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
    *
+   * Note: This operation can be inherently non-parallel since all elements have to pass through
+   * the same operator instance. (Only for special cases, such as aligned time windows is
+   * it possible to perform this operation in parallel).
+   *
    * @param size The size of the window.
    */
   def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = {
@@ -612,6 +620,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Windows this [[DataStream]] into sliding count windows.
    *
+   * Note: This operation can be inherently non-parallel since all elements have to pass through
+   * the same operator instance. (Only for special cases, such as aligned time windows is
+   * it possible to perform this operation in parallel).
+   *
    * @param size The size of the windows in number of elements.
    * @param slide The slide interval in number of elements.
    */
@@ -622,6 +634,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Windows this [[DataStream]] into tumbling count windows.
    *
+   * Note: This operation can be inherently non-parallel since all elements have to pass through
+   * the same operator instance. (Only for special cases, such as aligned time windows is
+   * it possible to perform this operation in parallel).
+   *
    * @param size The size of the windows in number of elements.
    */
   def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {