You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/24 13:08:45 UTC

flink git commit: [FLINK-5743] Mark WindowedStream.aggregate() methods as PublicEvolving

Repository: flink
Updated Branches:
  refs/heads/master 6f3723e83 -> 5983069fc


[FLINK-5743] Mark WindowedStream.aggregate() methods as PublicEvolving


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5983069f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5983069f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5983069f

Branch: refs/heads/master
Commit: 5983069fc2a40492b514ffa53a508cd8992c6bf2
Parents: 6f3723e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Feb 24 11:27:08 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 24 14:00:25 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  4 +++
 .../api/datastream/WindowedStream.java          |  4 +++
 .../streaming/api/scala/AllWindowedStream.scala | 19 ++++++++------
 .../streaming/api/scala/WindowedStream.scala    | 26 +++++++++++---------
 4 files changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index f883ef5..742a2ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -303,6 +303,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            AggregateFunction's result type   
 	 */
+	@PublicEvolving
 	public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
 		checkNotNull(function, "function");
 
@@ -331,6 +332,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            AggregateFunction's result type  
 	 */
+	@PublicEvolving
 	public <ACC, R> SingleOutputStreamOperator<R> aggregate(
 			AggregateFunction<T, ACC, R> function,
 			TypeInformation<ACC> accumulatorType,
@@ -366,6 +368,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            WindowFunction's result type
 	 */
+	@PublicEvolving
 	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
 			AggregateFunction<T, ACC, V> aggFunction,
 			AllWindowFunction<V, R, W> windowFunction) {
@@ -405,6 +408,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            WindowFunction's result type
 	 */
+	@PublicEvolving
 	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
 			AggregateFunction<T, ACC, V> aggregateFunction,
 			AllWindowFunction<V, R, W> windowFunction,

http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index b28434c..164e47e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -737,6 +737,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            AggregateFunction's result type
 	 */
+	@PublicEvolving
 	public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
 		checkNotNull(function, "function");
 
@@ -765,6 +766,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            AggregateFunction's result type
 	 */
+	@PublicEvolving
 	public <ACC, R> SingleOutputStreamOperator<R> aggregate(
 			AggregateFunction<T, ACC, R> function,
 			TypeInformation<ACC> accumulatorType,
@@ -800,6 +802,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            WindowFunction's result type
 	 */
+	@PublicEvolving
 	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
 			AggregateFunction<T, ACC, V> aggFunction,
 			WindowFunction<V, R, K, W> windowFunction) {
@@ -839,6 +842,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            WindowFunction's result type
 	 */
+	@PublicEvolving
 	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
 			AggregateFunction<T, ACC, V> aggregateFunction,
 			WindowFunction<V, R, K, W> windowFunction,

http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 7f52252..cf062fc 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -209,8 +209,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * @param aggregateFunction The aggregation function.
    * @return The data stream that is the result of applying the fold function to the window.
    */
-  def aggregate[ACC: TypeInformation, R: TypeInformation]
-      (aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = {
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, R: TypeInformation](
+      aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = {
 
     checkNotNull(aggregateFunction, "AggregationFunction must not be null")
 
@@ -232,9 +233,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
-      (preAggregator: AggregateFunction[T, ACC, V],
-       windowFunction: AllWindowFunction[V, R, W]): DataStream[R] = {
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+      preAggregator: AggregateFunction[T, ACC, V],
+      windowFunction: AllWindowFunction[V, R, W]): DataStream[R] = {
 
     checkNotNull(preAggregator, "AggregationFunction must not be null")
     checkNotNull(windowFunction, "Window function must not be null")
@@ -264,9 +266,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
-      (preAggregator: AggregateFunction[T, ACC, V],
-       windowFunction: (W, Iterable[V], Collector[R]) => Unit): DataStream[R] = {
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+      preAggregator: AggregateFunction[T, ACC, V],
+      windowFunction: (W, Iterable[V], Collector[R]) => Unit): DataStream[R] = {
 
     checkNotNull(preAggregator, "AggregationFunction must not be null")
     checkNotNull(windowFunction, "Window function must not be null")

http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index a5fbeb9..32a9f60 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -258,8 +258,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * @param aggregateFunction The aggregation function.
    * @return The data stream that is the result of applying the fold function to the window.
    */
-  def aggregate[ACC: TypeInformation, R: TypeInformation]
-      (aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = {
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, R: TypeInformation](
+      aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = {
 
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
@@ -279,9 +280,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
-      (preAggregator: AggregateFunction[T, ACC, V],
-       windowFunction: WindowFunction[V, R, K, W]): DataStream[R] = {
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+      preAggregator: AggregateFunction[T, ACC, V],
+      windowFunction: WindowFunction[V, R, K, W]): DataStream[R] = {
 
     val cleanedPreAggregator = clean(preAggregator)
     val cleanedWindowFunction = clean(windowFunction)
@@ -308,9 +310,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
-  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
-      (preAggregator: AggregateFunction[T, ACC, V],
-       windowFunction: (K, W, Iterable[V], Collector[R]) => Unit): DataStream[R] = {
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+      preAggregator: AggregateFunction[T, ACC, V],
+      windowFunction: (K, W, Iterable[V], Collector[R]) => Unit): DataStream[R] = {
 
     val cleanedPreAggregator = clean(preAggregator)
     val cleanedWindowFunction = clean(windowFunction)
@@ -337,9 +340,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
-  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
-  (preAggregator: AggregateFunction[T, ACC, V],
-   windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = {
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+      preAggregator: AggregateFunction[T, ACC, V],
+      windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = {
 
     val cleanedPreAggregator = clean(preAggregator)
     val cleanedWindowFunction = clean(windowFunction)