You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/11 12:13:14 UTC

flink git commit: [hotfix] Add countWindow and countWindowAll shortcut

Repository: flink
Updated Branches:
  refs/heads/master d38aeaceb -> 86080bb97


[hotfix] Add countWindow and countWindowAll shortcut


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86080bb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86080bb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86080bb9

Branch: refs/heads/master
Commit: 86080bb971c8d48a6a4225b41b30f80b0986a6aa
Parents: d38aeac
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Oct 11 12:12:41 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Oct 11 12:12:41 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 28 +++++++++++++++++++-
 .../streaming/api/datastream/KeyedStream.java   | 26 ++++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 21 ++++++++++++++-
 .../flink/streaming/api/scala/KeyedStream.scala | 24 ++++++++++++++---
 4 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86080bb9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f56e53b..42047b9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,10 +59,15 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
@@ -370,7 +375,8 @@ public class DataStream<T> {
 	 * @see KeySelector
 	 */
 	public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
-		return setConnectionType(new CustomPartitionerWrapper<K, T>(clean(partitioner), clean(keySelector)));
+		return setConnectionType(new CustomPartitionerWrapper<K, T>(clean(partitioner),
+				clean(keySelector)));
 	}
 
 	//	private helper method for custom partitioning
@@ -653,6 +659,26 @@ public class DataStream<T> {
 	}
 
 	/**
+	 * Windows this {@code DataStream} into tumbling count windows.
+	 *
+	 * @param size The size of the windows in number of elements.
+	 */
+	public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
+		return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
+	}
+
+	/**
+	 * Windows this {@code DataStream} into sliding count windows.
+	 * @param size The size of the windows in number of elements.
+	 * @param slide The slide interval in number of elements.
+	 */
+	public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
+		return windowAll(GlobalWindows.create())
+				.evictor(CountEvictor.of(size))
+				.trigger(CountTrigger.of(slide));
+	}
+
+	/**
 	 * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
 	 * over a key grouped stream. Elements are put into windows by a
 	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of

http://git-wip-us.apache.org/repos/asf/flink/blob/86080bb9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 2e6d7d6..0da419c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -32,10 +32,15 @@ import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
@@ -138,6 +143,27 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
+	 * Windows this {@code KeyedStream} into tumbling count windows.
+	 *
+	 * @param size The size of the windows in number of elements.
+	 */
+	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
+		return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
+	}
+
+	/**
+	 * Windows this {@code KeyedStream} into sliding count windows.
+	 *
+	 * @param size The size of the windows in number of elements.
+	 * @param slide The slide interval in number of elements.
+	 */
+	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
+		return window(GlobalWindows.create())
+				.evictor(CountEvictor.of(size))
+				.trigger(CountTrigger.of(slide));
+	}
+
+	/**
 	 * Windows this data stream to a {@code WindowedStream}, which evaluates windows
 	 * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
 	 * grouping of elements is done both by key and by window.

http://git-wip-us.apache.org/repos/asf/flink/blob/86080bb9/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index afd0700..47dbf50 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, Ti
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.AbstractTime
-import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
 
@@ -610,6 +610,25 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Windows this [[DataStream]] into sliding count windows.
+   *
+   * @param size The size of the windows in number of elements.
+   * @param slide The slide interval in number of elements.
+   */
+  def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] = {
+    new AllWindowedStream(javaStream.countWindowAll(size, slide))
+  }
+
+  /**
+   * Windows this [[DataStream]] into tumbling count windows.
+   *
+   * @param size The size of the windows in number of elements.
+   */
+  def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {
+    new AllWindowedStream(javaStream.countWindowAll(size))
+  }
+
+  /**
    * Windows this data stream to a [[AllWindowedStream]], which evaluates windows
    * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The grouping
    * of elements is done both by key and by window.

http://git-wip-us.apache.org/repos/asf/flink/blob/86080bb9/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index c605bb1..a588931 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.AbstractTime
-import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window, TimeWindow}
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.FoldFunction
@@ -41,7 +41,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
   /**
    * Windows this [[KeyedStream]] into tumbling time windows.
    *
-   * <p>
    * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
    * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
    * set using
@@ -55,9 +54,27 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
   }
 
   /**
+   * Windows this [[KeyedStream]] into sliding count windows.
+   *
+   * @param size The size of the windows in number of elements.
+   * @param slide The slide interval in number of elements.
+   */
+  def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] = {
+    new WindowedStream(javaStream.countWindow(size, slide))
+  }
+
+  /**
+   * Windows this [[KeyedStream]] into tumbling count windows.
+   *
+   * @param size The size of the windows in number of elements.
+   */
+  def countWindow(size: Long): WindowedStream[T, K, GlobalWindow] = {
+    new WindowedStream(javaStream.countWindow(size))
+  }
+
+  /**
    * Windows this [[KeyedStream]] into sliding time windows.
    *
-   * <p>
    * This is a shortcut for either `.window(SlidingTimeWindows.of(size))` or
    * `.window(SlidingProcessingTimeWindows.of(size))` depending on the time characteristic
    * set using
@@ -75,7 +92,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The
    * grouping of elements is done both by key and by window.
    *
-   * <p>
    * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify
    * when windows are evaluated. However, `WindowAssigner` have a default `Trigger`
    * that is used if a `Trigger` is not specified.