You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:46 UTC

[10/10] flink git commit: [hotfix] Change result of WindowedStream ops to SingleOutputStreamOperator

[hotfix] Change result of WindowedStream ops to SingleOutputStreamOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2b2781f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2b2781f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2b2781f

Branch: refs/heads/master
Commit: b2b2781fd04a14807849fdc41e4b5ecb8ab75f13
Parents: 0ee0c1f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Oct 8 11:32:18 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 9 11:15:59 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       | 46 +++++++++----------
 .../api/datastream/WindowedStream.java          | 48 ++++++++++----------
 2 files changed, 47 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2b2781f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index c7a70d7..83e7adc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -126,14 +126,14 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The reduce function.
 	 * @return The data stream that is the result of applying the reduce function to the window. 
 	 */
-	public DataStream<T> reduce(ReduceFunction<T> function) {
+	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "Reduce at " + callLocation;
 
-		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+		SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
 		if (result != null) {
 			return result;
 		}
@@ -173,7 +173,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
 	 */
-	public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 
@@ -191,7 +191,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
 	 */
-	public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 		return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
@@ -208,7 +208,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function) {
+	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
 		TypeInformation<T> inType = input.getType();
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 				function, AllWindowFunction.class, true, true, inType, null, false);
@@ -227,14 +227,14 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "MapWindow at " + callLocation;
 
-		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
 		if (result != null) {
 			return result;
 		}
@@ -274,7 +274,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param positionToSum The position in the tuple/array to sum
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> sum(int positionToSum) {
+	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
 		return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
 	}
 
@@ -291,7 +291,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param field The field to sum
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> sum(String field) {
+	public SingleOutputStreamOperator<T, ?> sum(String field) {
 		return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
 	}
 
@@ -302,7 +302,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param positionToMin The position to minimize
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> min(int positionToMin) {
+	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
 		return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
 	}
 
@@ -319,7 +319,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param field The field expression based on which the aggregation will be applied.
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> min(String field) {
+	public SingleOutputStreamOperator<T, ?> min(String field) {
 		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
 	}
 
@@ -332,7 +332,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 *            The position to minimize by
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> minBy(int positionToMinBy) {
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
 
@@ -344,7 +344,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param positionToMinBy The position to minimize by
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> minBy(String positionToMinBy) {
+	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
 
@@ -358,7 +358,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> minBy(int positionToMinBy, boolean first) {
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
 		return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
 	}
 
@@ -373,7 +373,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param first If True then in case of field equality the first object will be returned
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> minBy(String field, boolean first) {
+	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
 		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
 	}
 
@@ -384,7 +384,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param positionToMax The position to maximize
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> max(int positionToMax) {
+	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
 		return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
 	}
 
@@ -398,7 +398,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param field The field expression based on which the aggregation will be applied.
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> max(String field) {
+	public SingleOutputStreamOperator<T, ?> max(String field) {
 		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
 	}
 
@@ -411,7 +411,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 *            The position to maximize by
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> maxBy(int positionToMaxBy) {
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
 
@@ -424,7 +424,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 *            The position to maximize by
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> maxBy(String positionToMaxBy) {
+	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
 
@@ -438,7 +438,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
 		return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
 	}
 
@@ -453,11 +453,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param first If True then in case of field equality the first object will be returned
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> maxBy(String field, boolean first) {
+	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
 		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
 	}
 
-	private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
+	private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
 		return reduce(aggregator);
 	}
 
@@ -466,7 +466,7 @@ public class AllWindowedStream<T, W extends Window> {
 	// ------------------------------------------------------------------------
 
 
-	private <R> DataStream<R> createFastTimeOperatorIfValid(
+	private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
 			Function function,
 			TypeInformation<R> resultType,
 			String functionName) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b2b2781f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 42e0bd7..1b511d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -137,14 +137,14 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param function The reduce function.
 	 * @return The data stream that is the result of applying the reduce function to the window. 
 	 */
-	public DataStream<T> reduce(ReduceFunction<T> function) {
+	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "Reduce at " + callLocation;
 
-		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+		SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
 		if (result != null) {
 			return result;
 		}
@@ -187,7 +187,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
 	 */
-	public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 
@@ -205,7 +205,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
 	 */
-	public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 		return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType);
@@ -223,7 +223,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function) {
+	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function) {
 		TypeInformation<T> inType = input.getType();
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 				function, WindowFunction.class, true, true, inType, null, false);
@@ -243,14 +243,14 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "MapWindow at " + callLocation;
 
-		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
 		if (result != null) {
 			return result;
 		}
@@ -276,7 +276,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					new HeapWindowBuffer.Factory<T>(),
 					function,
-					trigger).enableSetProcessingTime(setProcessingTime);;
+					trigger).enableSetProcessingTime(setProcessingTime);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -293,7 +293,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param positionToSum The position in the tuple/array to sum
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> sum(int positionToSum) {
+	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
 		return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
 	}
 
@@ -310,7 +310,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param field The field to sum
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> sum(String field) {
+	public SingleOutputStreamOperator<T, ?> sum(String field) {
 		return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
 	}
 
@@ -321,7 +321,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param positionToMin The position to minimize
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> min(int positionToMin) {
+	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
 		return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
 	}
 
@@ -338,7 +338,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param field The field expression based on which the aggregation will be applied.
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> min(String field) {
+	public SingleOutputStreamOperator<T, ?> min(String field) {
 		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
 	}
 
@@ -351,7 +351,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 *            The position to minimize by
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> minBy(int positionToMinBy) {
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
 
@@ -363,7 +363,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param positionToMinBy The position to minimize by
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> minBy(String positionToMinBy) {
+	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
 
@@ -377,7 +377,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> minBy(int positionToMinBy, boolean first) {
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
 		return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
 	}
 
@@ -392,7 +392,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param first If True then in case of field equality the first object will be returned
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> minBy(String field, boolean first) {
+	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
 		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
 	}
 
@@ -403,7 +403,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param positionToMax The position to maximize
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> max(int positionToMax) {
+	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
 		return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
 	}
 
@@ -417,7 +417,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param field The field expression based on which the aggregation will be applied.
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> max(String field) {
+	public SingleOutputStreamOperator<T, ?> max(String field) {
 		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
 	}
 
@@ -430,7 +430,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 *            The position to maximize by
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> maxBy(int positionToMaxBy) {
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
 
@@ -443,7 +443,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 *            The position to maximize by
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> maxBy(String positionToMaxBy) {
+	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
 
@@ -457,7 +457,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
 		return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
 	}
 
@@ -472,11 +472,11 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param first If True then in case of field equality the first object will be returned
 	 * @return The transformed DataStream.
 	 */
-	public DataStream<T> maxBy(String field, boolean first) {
+	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
 		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
 	}
 
-	private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
+	private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
 		return reduce(aggregator);
 	}
 
@@ -484,7 +484,7 @@ public class WindowedStream<T, K, W extends Window> {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private <R> DataStream<R> createFastTimeOperatorIfValid(
+	private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
 			Function function,
 			TypeInformation<R> resultType,
 			String functionName) {