You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/30 13:28:26 UTC

[2/2] flink git commit: [FLINK-4271] [DataStream API] Enable CoGroupedStreams and JoinedStreams to set parallelism.

[FLINK-4271] [DataStream API] Enable CoGroupedStreams and JoinedStreams to set parallelism.

This closes #2305


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a20229b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a20229b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a20229b

Branch: refs/heads/master
Commit: 2a20229b56997ff1249a879acfb2bcca4323b0a1
Parents: 96b6359
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Jul 28 14:32:13 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 30 15:27:53 2016 +0200

----------------------------------------------------------------------
 .../api/datastream/CoGroupedStreams.java        | 34 ++++++++++
 .../streaming/api/datastream/JoinedStreams.java | 65 ++++++++++++++++++++
 2 files changed, 99 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a20229b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index a9a64af..0bab4cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -243,6 +243,22 @@ public class CoGroupedStreams<T1, T2> {
 		/**
 		 * Completes the co-group operation with the user function that is executed
 		 * for windowed groups.
+		 *
+		 * <p>
+		 *     Note: This is a temporary workaround while the {@link #apply(CoGroupFunction)} method has the wrong return type.
+		 * </p>
+		 * @deprecated This method will be replaced by {@link #apply(CoGroupFunction)} in Flink 2.0.
+		 * So use the {@link #apply(CoGroupFunction)} in the future.
+         */
+		@PublicEvolving
+		@Deprecated
+		public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function) {
+			return (SingleOutputStreamOperator<T>) apply(function);
+		}
+
+		/**
+		 * Completes the co-group operation with the user function that is executed
+		 * for windowed groups.
 		 */
 		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
 			//clean the closure
@@ -253,9 +269,11 @@ public class CoGroupedStreams<T1, T2> {
 			
 			DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
 					.map(new Input1Tagger<T1, T2>())
+					.setParallelism(input1.getParallelism())
 					.returns(unionType);
 			DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
 					.map(new Input2Tagger<T1, T2>())
+					.setParallelism(input2.getParallelism())
 					.returns(unionType);
 
 			DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
@@ -274,6 +292,22 @@ public class CoGroupedStreams<T1, T2> {
 
 			return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
 		}
+
+		/**
+		 * Completes the co-group operation with the user function that is executed
+		 * for windowed groups.
+		 *
+		 * <p>
+		 *     Note: This is a temporary workaround while the {@link #apply(CoGroupFunction, TypeInformation)} method has the wrong return type.
+		 * </p>
+		 * @deprecated This method will be replaced by {@link #apply(CoGroupFunction, TypeInformation)} in Flink 2.0.
+		 * So use the {@link #apply(CoGroupFunction, TypeInformation)} in the future.
+		 */
+		@PublicEvolving
+		@Deprecated
+		public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+			return (SingleOutputStreamOperator<T>) apply(function, resultType);
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2a20229b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index 86c6226..8f8fc67 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -237,6 +237,22 @@ public class JoinedStreams<T1, T2> {
 		/**
 		 * Completes the join operation with the user function that is executed
 		 * for each combination of elements with the same key in a window.
+		 *
+		 * <p>
+		 *     Note: This is a temporary workaround while the {@link #apply(JoinFunction)} method has the wrong return type.
+		 * </p>
+		 * @deprecated This method will be replaced by {@link #apply(JoinFunction)} in Flink 2.0.
+		 * So use the {@link #apply(JoinFunction)} in the future.
+		 */
+		@PublicEvolving
+		@Deprecated
+		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) {
+			return (SingleOutputStreamOperator<T>) apply(function);
+		}
+
+		/**
+		 * Completes the join operation with the user function that is executed
+		 * for each combination of elements with the same key in a window.
 		 */
 		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
 			//clean the closure
@@ -252,6 +268,23 @@ public class JoinedStreams<T1, T2> {
 
 		}
 
+
+		/**
+		 * Completes the join operation with the user function that is executed
+		 * for each combination of elements with the same key in a window.
+		 *
+		 * <p>
+		 *     Note: This is a temporary workaround while the {@link #apply(FlatJoinFunction, TypeInformation)} method has the wrong return type.
+		 * </p>
+		 * @deprecated This method will be replaced by {@link #apply(FlatJoinFunction, TypeInformation)} in Flink 2.0.
+		 * So use the {@link #apply(FlatJoinFunction, TypeInformation)} in the future.
+		 */
+		@PublicEvolving
+		@Deprecated
+		public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+			return (SingleOutputStreamOperator<T>) apply(function, resultType);
+		}
+
 		/**
 		 * Completes the join operation with the user function that is executed
 		 * for each combination of elements with the same key in a window.
@@ -273,6 +306,22 @@ public class JoinedStreams<T1, T2> {
 		/**
 		 * Completes the join operation with the user function that is executed
 		 * for each combination of elements with the same key in a window.
+		 *
+		 * <p>
+		 *     Note: This is a temporary workaround while the {@link #apply(FlatJoinFunction)} method has the wrong return type.
+		 * </p>
+		 * @deprecated This method will be replaced by {@link #apply(FlatJoinFunction)} in Flink 2.0.
+		 * So use the {@link #apply(FlatJoinFunction)} in the future.
+		 */
+		@PublicEvolving
+		@Deprecated
+		public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) {
+			return (SingleOutputStreamOperator<T>) apply(function);
+		}
+
+		/**
+		 * Completes the join operation with the user function that is executed
+		 * for each combination of elements with the same key in a window.
 		 */
 		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
 			//clean the closure
@@ -287,6 +336,22 @@ public class JoinedStreams<T1, T2> {
 					.apply(new JoinCoGroupFunction<>(function), resultType);
 
 		}
+
+		/**
+		 * Completes the join operation with the user function that is executed
+		 * for each combination of elements with the same key in a window.
+		 *
+		 * <p>
+		 *     Note: This is a temporary workaround while the {@link #apply(JoinFunction, TypeInformation)} method has the wrong return type.
+		 * </p>
+		 * @deprecated This method will be replaced by {@link #apply(JoinFunction, TypeInformation)} in Flink 2.0.
+		 * So use the {@link #apply(JoinFunction, TypeInformation)} in the future.
+		 */
+		@PublicEvolving
+		@Deprecated
+		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+			return (SingleOutputStreamOperator<T>) apply(function, resultType);
+		}
 	}
 	
 	// ------------------------------------------------------------------------