You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/11 12:13:14 UTC
flink git commit: [hotfix] Add countWindow and countWindowAll shortcut
Repository: flink
Updated Branches:
refs/heads/master d38aeaceb -> 86080bb97
[hotfix] Add countWindow and countWindowAll shortcut
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86080bb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86080bb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86080bb9
Branch: refs/heads/master
Commit: 86080bb971c8d48a6a4225b41b30f80b0986a6aa
Parents: d38aeac
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Oct 11 12:12:41 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Oct 11 12:12:41 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 28 +++++++++++++++++++-
.../streaming/api/datastream/KeyedStream.java | 26 ++++++++++++++++++
.../flink/streaming/api/scala/DataStream.scala | 21 ++++++++++++++-
.../flink/streaming/api/scala/KeyedStream.scala | 24 ++++++++++++++---
4 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/86080bb9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f56e53b..42047b9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,10 +59,15 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
@@ -370,7 +375,8 @@ public class DataStream<T> {
* @see KeySelector
*/
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
- return setConnectionType(new CustomPartitionerWrapper<K, T>(clean(partitioner), clean(keySelector)));
+ return setConnectionType(new CustomPartitionerWrapper<K, T>(clean(partitioner),
+ clean(keySelector)));
}
// private helper method for custom partitioning
@@ -653,6 +659,26 @@ public class DataStream<T> {
}
/**
+ * Windows this {@code DataStream} into tumbling count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ */
+ public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
+ return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
+ }
+
+ /**
+ * Windows this {@code DataStream} into sliding count windows.
+ * @param size The size of the windows in number of elements.
+ * @param slide The slide interval in number of elements.
+ */
+ public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
+ return windowAll(GlobalWindows.create())
+ .evictor(CountEvictor.of(size))
+ .trigger(CountTrigger.of(slide));
+ }
+
+ /**
* Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
* over a key grouped stream. Elements are put into windows by a
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
http://git-wip-us.apache.org/repos/asf/flink/blob/86080bb9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 2e6d7d6..0da419c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -32,10 +32,15 @@ import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
@@ -138,6 +143,27 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
}
/**
+ * Windows this {@code KeyedStream} into tumbling count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ */
+ public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
+ return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
+ }
+
+ /**
+ * Windows this {@code KeyedStream} into sliding count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ * @param slide The slide interval in number of elements.
+ */
+ public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
+ return window(GlobalWindows.create())
+ .evictor(CountEvictor.of(size))
+ .trigger(CountTrigger.of(slide));
+ }
+
+ /**
* Windows this data stream to a {@code WindowedStream}, which evaluates windows
* over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
* grouping of elements is done both by key and by window.
http://git-wip-us.apache.org/repos/asf/flink/blob/86080bb9/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index afd0700..47dbf50 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, Ti
import org.apache.flink.streaming.api.scala.function.StatefulFunction
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.AbstractTime
-import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
@@ -610,6 +610,25 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
+ * Windows this [[DataStream]] into sliding count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ * @param slide The slide interval in number of elements.
+ */
+ def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] = {
+ new AllWindowedStream(javaStream.countWindowAll(size, slide))
+ }
+
+ /**
+ * Windows this [[DataStream]] into tumbling count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ */
+ def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {
+ new AllWindowedStream(javaStream.countWindowAll(size))
+ }
+
+ /**
* Windows this data stream to a [[AllWindowedStream]], which evaluates windows
* over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The grouping
* of elements is done both by key and by window.
http://git-wip-us.apache.org/repos/asf/flink/blob/86080bb9/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index c605bb1..a588931 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
import org.apache.flink.streaming.api.operators.StreamGroupedReduce
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.AbstractTime
-import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window, TimeWindow}
import scala.reflect.ClassTag
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.functions.FoldFunction
@@ -41,7 +41,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
/**
* Windows this [[KeyedStream]] into tumbling time windows.
*
- * <p>
* This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
* `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
* set using
@@ -55,9 +54,27 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
}
/**
+ * Windows this [[KeyedStream]] into sliding count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ * @param slide The slide interval in number of elements.
+ */
+ def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] = {
+ new WindowedStream(javaStream.countWindow(size, slide))
+ }
+
+ /**
+ * Windows this [[KeyedStream]] into tumbling count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ */
+ def countWindow(size: Long): WindowedStream[T, K, GlobalWindow] = {
+ new WindowedStream(javaStream.countWindow(size))
+ }
+
+ /**
* Windows this [[KeyedStream]] into sliding time windows.
*
- * <p>
* This is a shortcut for either `.window(SlidingTimeWindows.of(size))` or
* `.window(SlidingProcessingTimeWindows.of(size))` depending on the time characteristic
* set using
@@ -75,7 +92,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The
* grouping of elements is done both by key and by window.
*
- * <p>
* A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify
* when windows are evaluated. However, `WindowAssigner` have a default `Trigger`
* that is used if a `Trigger` is not specified.