You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/24 13:08:45 UTC
flink git commit: [FLINK-5743] Mark WindowedStream.aggregate()
methods as PublicEvolving
Repository: flink
Updated Branches:
refs/heads/master 6f3723e83 -> 5983069fc
[FLINK-5743] Mark WindowedStream.aggregate() methods as PublicEvolving
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5983069f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5983069f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5983069f
Branch: refs/heads/master
Commit: 5983069fc2a40492b514ffa53a508cd8992c6bf2
Parents: 6f3723e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Feb 24 11:27:08 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 24 14:00:25 2017 +0100
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 4 +++
.../api/datastream/WindowedStream.java | 4 +++
.../streaming/api/scala/AllWindowedStream.scala | 19 ++++++++------
.../streaming/api/scala/WindowedStream.scala | 26 +++++++++++---------
4 files changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index f883ef5..742a2ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -303,6 +303,7 @@ public class AllWindowedStream<T, W extends Window> {
* @param <R> The type of the elements in the resulting stream, equal to the
* AggregateFunction's result type
*/
+ @PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");
@@ -331,6 +332,7 @@ public class AllWindowedStream<T, W extends Window> {
* @param <R> The type of the elements in the resulting stream, equal to the
* AggregateFunction's result type
*/
+ @PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, R> function,
TypeInformation<ACC> accumulatorType,
@@ -366,6 +368,7 @@ public class AllWindowedStream<T, W extends Window> {
* @param <R> The type of the elements in the resulting stream, equal to the
* WindowFunction's result type
*/
+ @PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggFunction,
AllWindowFunction<V, R, W> windowFunction) {
@@ -405,6 +408,7 @@ public class AllWindowedStream<T, W extends Window> {
* @param <R> The type of the elements in the resulting stream, equal to the
* WindowFunction's result type
*/
+ @PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggregateFunction,
AllWindowFunction<V, R, W> windowFunction,
http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index b28434c..164e47e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -737,6 +737,7 @@ public class WindowedStream<T, K, W extends Window> {
* @param <R> The type of the elements in the resulting stream, equal to the
* AggregateFunction's result type
*/
+ @PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");
@@ -765,6 +766,7 @@ public class WindowedStream<T, K, W extends Window> {
* @param <R> The type of the elements in the resulting stream, equal to the
* AggregateFunction's result type
*/
+ @PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, R> function,
TypeInformation<ACC> accumulatorType,
@@ -800,6 +802,7 @@ public class WindowedStream<T, K, W extends Window> {
* @param <R> The type of the elements in the resulting stream, equal to the
* WindowFunction's result type
*/
+ @PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggFunction,
WindowFunction<V, R, K, W> windowFunction) {
@@ -839,6 +842,7 @@ public class WindowedStream<T, K, W extends Window> {
* @param <R> The type of the elements in the resulting stream, equal to the
* WindowFunction's result type
*/
+ @PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggregateFunction,
WindowFunction<V, R, K, W> windowFunction,
http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 7f52252..cf062fc 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -209,8 +209,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param aggregateFunction The aggregation function.
* @return The data stream that is the result of applying the fold function to the window.
*/
- def aggregate[ACC: TypeInformation, R: TypeInformation]
- (aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = {
+ @PublicEvolving
+ def aggregate[ACC: TypeInformation, R: TypeInformation](
+ aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = {
checkNotNull(aggregateFunction, "AggregationFunction must not be null")
@@ -232,9 +233,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
- def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
- (preAggregator: AggregateFunction[T, ACC, V],
- windowFunction: AllWindowFunction[V, R, W]): DataStream[R] = {
+ @PublicEvolving
+ def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+ preAggregator: AggregateFunction[T, ACC, V],
+ windowFunction: AllWindowFunction[V, R, W]): DataStream[R] = {
checkNotNull(preAggregator, "AggregationFunction must not be null")
checkNotNull(windowFunction, "Window function must not be null")
@@ -264,9 +266,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
- def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
- (preAggregator: AggregateFunction[T, ACC, V],
- windowFunction: (W, Iterable[V], Collector[R]) => Unit): DataStream[R] = {
+ @PublicEvolving
+ def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+ preAggregator: AggregateFunction[T, ACC, V],
+ windowFunction: (W, Iterable[V], Collector[R]) => Unit): DataStream[R] = {
checkNotNull(preAggregator, "AggregationFunction must not be null")
checkNotNull(windowFunction, "Window function must not be null")
http://git-wip-us.apache.org/repos/asf/flink/blob/5983069f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index a5fbeb9..32a9f60 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -258,8 +258,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param aggregateFunction The aggregation function.
* @return The data stream that is the result of applying the fold function to the window.
*/
- def aggregate[ACC: TypeInformation, R: TypeInformation]
- (aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = {
+ @PublicEvolving
+ def aggregate[ACC: TypeInformation, R: TypeInformation](
+ aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R] = {
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
@@ -279,9 +280,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
- def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
- (preAggregator: AggregateFunction[T, ACC, V],
- windowFunction: WindowFunction[V, R, K, W]): DataStream[R] = {
+ @PublicEvolving
+ def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+ preAggregator: AggregateFunction[T, ACC, V],
+ windowFunction: WindowFunction[V, R, K, W]): DataStream[R] = {
val cleanedPreAggregator = clean(preAggregator)
val cleanedWindowFunction = clean(windowFunction)
@@ -308,9 +310,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
- def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
- (preAggregator: AggregateFunction[T, ACC, V],
- windowFunction: (K, W, Iterable[V], Collector[R]) => Unit): DataStream[R] = {
+ @PublicEvolving
+ def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+ preAggregator: AggregateFunction[T, ACC, V],
+ windowFunction: (K, W, Iterable[V], Collector[R]) => Unit): DataStream[R] = {
val cleanedPreAggregator = clean(preAggregator)
val cleanedWindowFunction = clean(windowFunction)
@@ -337,9 +340,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
- def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
- (preAggregator: AggregateFunction[T, ACC, V],
- windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = {
+ @PublicEvolving
+ def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
+ preAggregator: AggregateFunction[T, ACC, V],
+ windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = {
val cleanedPreAggregator = clean(preAggregator)
val cleanedWindowFunction = clean(windowFunction)