You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:46 UTC

[12/13] flink git commit: [FLINK-2550] Rename IterativeDataStream to IterativeStream

[FLINK-2550] Rename IterativeDataStream to IterativeStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b6e762f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b6e762f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b6e762f

Branch: refs/heads/master
Commit: 7b6e762fda09e5edb1d5cca2398cb8035d6794b4
Parents: 0de9d2e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 5 11:42:53 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  28 +--
 .../api/datastream/IterativeDataStream.java     | 218 -------------------
 .../api/datastream/IterativeStream.java         | 218 +++++++++++++++++++
 .../apache/flink/streaming/api/IterateTest.java |  38 ++--
 .../api/complex/ComplexIntegrationTest.java     |   4 +-
 .../examples/iteration/IterateExample.java      |   4 +-
 6 files changed, 255 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 32d9012..003ef36 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -467,14 +467,14 @@ public class DataStream<T> {
 	/**
 	 * Initiates an iterative part of the program that feeds back data streams.
 	 * The iterative part needs to be closed by calling
-	 * {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
-	 * this IterativeDataStream will be the iteration head. The data stream
-	 * given to the {@link IterativeDataStream#closeWith(DataStream)} method is
+	 * {@link IterativeStream#closeWith(DataStream)}. The transformation of
+	 * this IterativeStream will be the iteration head. The data stream
+	 * given to the {@link IterativeStream#closeWith(DataStream)} method is
 	 * the data stream that will be fed back and used as the input for the
 	 * iteration head. The user can also use different feedback type than the
 	 * input of the iteration and treat the input and feedback streams as a
 	 * {@link ConnectedStreams} be calling
-	 * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
+	 * {@link IterativeStream#withFeedbackType(TypeInformation)}
 	 * <p>
 	 * A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to
@@ -482,7 +482,7 @@ public class DataStream<T> {
 	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head unless it is changed in the
-	 * {@link IterativeDataStream#closeWith(DataStream)} call.
+	 * {@link IterativeStream#closeWith(DataStream)} call.
 	 * <p>
 	 * By default a DataStream with iteration will never terminate, but the user
 	 * can use the maxWaitTime parameter to set a max waiting time for the
@@ -491,21 +491,21 @@ public class DataStream<T> {
 	 * 
 	 * @return The iterative data stream created.
 	 */
-	public IterativeDataStream<T> iterate() {
-		return new IterativeDataStream<T>(this, 0);
+	public IterativeStream<T> iterate() {
+		return new IterativeStream<T>(this, 0);
 	}
 
 	/**
 	 * Initiates an iterative part of the program that feeds back data streams.
 	 * The iterative part needs to be closed by calling
-	 * {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
-	 * this IterativeDataStream will be the iteration head. The data stream
-	 * given to the {@link IterativeDataStream#closeWith(DataStream)} method is
+	 * {@link IterativeStream#closeWith(DataStream)}. The transformation of
+	 * this IterativeStream will be the iteration head. The data stream
+	 * given to the {@link IterativeStream#closeWith(DataStream)} method is
 	 * the data stream that will be fed back and used as the input for the
 	 * iteration head. The user can also use different feedback type than the
 	 * input of the iteration and treat the input and feedback streams as a
 	 * {@link ConnectedStreams} be calling
-	 * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
+	 * {@link IterativeStream#withFeedbackType(TypeInformation)}
 	 * <p>
 	 * A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to
@@ -513,7 +513,7 @@ public class DataStream<T> {
 	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head unless it is changed in the
-	 * {@link IterativeDataStream#closeWith(DataStream)} call.
+	 * {@link IterativeStream#closeWith(DataStream)} call.
 	 * <p>
 	 * By default a DataStream with iteration will never terminate, but the user
 	 * can use the maxWaitTime parameter to set a max waiting time for the
@@ -526,8 +526,8 @@ public class DataStream<T> {
 	 * 
 	 * @return The iterative data stream created.
 	 */
-	public IterativeDataStream<T> iterate(long maxWaitTimeMillis) {
-		return new IterativeDataStream<T>(this, maxWaitTimeMillis);
+	public IterativeStream<T> iterate(long maxWaitTimeMillis) {
+		return new IterativeStream<T>(this, maxWaitTimeMillis);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
deleted file mode 100644
index 75216ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-
-import java.util.Collection;
-
-/**
- * The iterative data stream represents the start of an iteration in a {@link DataStream}.
- * 
- * @param <T> Type of the elements in this Stream
- */
-public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, IterativeDataStream<T>> {
-
-	// We store these so that we can create a co-iteration if we need to
-	private DataStream<T> originalInput;
-	private long maxWaitTime;
-	
-	protected IterativeDataStream(DataStream<T> dataStream, long maxWaitTime) {
-		super(dataStream.getExecutionEnvironment(),
-				new FeedbackTransformation<T>(dataStream.getTransformation(), maxWaitTime));
-		this.originalInput = dataStream;
-		this.maxWaitTime = maxWaitTime;
-		setBufferTimeout(dataStream.environment.getBufferTimeout());
-	}
-
-	/**
-	 * Closes the iteration. This method defines the end of the iterative
-	 * program part that will be fed back to the start of the iteration.
-	 *
-	 * <p>
-	 * A common usage pattern for streaming iterations is to use output
-	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
-	 * for more information.
-	 * 
-	 * @param feedbackStream
-	 *            {@link DataStream} that will be used as input to the iteration
-	 *            head.
-	 *
-	 * @return The feedback stream.
-	 * 
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public DataStream<T> closeWith(DataStream<T> feedbackStream) {
-
-		Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
-
-		if (!predecessors.contains(this.transformation)) {
-			throw new UnsupportedOperationException(
-					"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
-		}
-
-		((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
-
-		return feedbackStream;
-	}
-
-	/**
-	 * Changes the feedback type of the iteration and allows the user to apply
-	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedStreams}.
-	 *
-	 * <p>
-	 * For type safety the user needs to define the feedback type
-	 * 
-	 * @param feedbackTypeString
-	 *            String describing the type information of the feedback stream.
-	 * @return A {@link ConnectedIterativeDataStreams}.
-	 */
-	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackTypeString) {
-		return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
-	}
-
-	/**
-	 * Changes the feedback type of the iteration and allows the user to apply
-	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedStreams}.
-	 *
-	 * <p>
-	 * For type safety the user needs to define the feedback type
-	 * 
-	 * @param feedbackTypeClass
-	 *            Class of the elements in the feedback stream.
-	 * @return A {@link ConnectedIterativeDataStreams}.
-	 */
-	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
-		return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
-	}
-
-	/**
-	 * Changes the feedback type of the iteration and allows the user to apply
-	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedStreams}.
-	 *
-	 * <p>
-	 * For type safety the user needs to define the feedback type
-	 * 
-	 * @param feedbackType
-	 *            The type information of the feedback stream.
-	 * @return A {@link ConnectedIterativeDataStreams}.
-	 */
-	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
-		return new ConnectedIterativeDataStreams<T, F>(originalInput, feedbackType, maxWaitTime);
-	}
-	
-	/**
-	 * The {@link ConnectedIterativeDataStreams} represent a start of an
-	 * iterative part of a streaming program, where the original input of the
-	 * iteration and the feedback of the iteration are connected as in a
-	 * {@link ConnectedStreams}.
-	 *
-	 * <p>
-	 * The user can distinguish between the two inputs using co-transformation,
-	 * thus eliminating the need for mapping the inputs and outputs to a common
-	 * type.
-	 * 
-	 * @param <I>
-	 *            Type of the input of the iteration
-	 * @param <F>
-	 *            Type of the feedback of the iteration
-	 */
-	public static class ConnectedIterativeDataStreams<I, F> extends ConnectedStreams<I, F> {
-
-		private CoFeedbackTransformation<F> coFeedbackTransformation;
-
-		public ConnectedIterativeDataStreams(DataStream<I> input,
-				TypeInformation<F> feedbackType,
-				long waitTime) {
-			super(input.getExecutionEnvironment(),
-					input,
-					new DataStream<F>(input.getExecutionEnvironment(),
-							new CoFeedbackTransformation<F>(input.getParallelism(),
-									feedbackType,
-									waitTime)));
-			this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
-		}
-
-		/**
-		 * Closes the iteration. This method defines the end of the iterative
-		 * program part that will be fed back to the start of the iteration as
-		 * the second input in the {@link ConnectedStreams}.
-		 * 
-		 * @param feedbackStream
-		 *            {@link DataStream} that will be used as second input to
-		 *            the iteration head.
-		 * @return The feedback stream.
-		 * 
-		 */
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		public DataStream<F> closeWith(DataStream<F> feedbackStream) {
-
-			Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
-
-			if (!predecessors.contains(this.coFeedbackTransformation)) {
-				throw new UnsupportedOperationException(
-						"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
-			}
-
-			coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
-
-			return feedbackStream;
-		}
-		
-		private UnsupportedOperationException groupingException = new UnsupportedOperationException(
-				"Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
-		
-		@Override
-		public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
-
-		@Override
-		public ConnectedStreams<I, F> keyBy(String field1, String field2) {throw groupingException;}
-
-		@Override
-		public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {throw groupingException;}
-
-		@Override
-		public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
-
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
new file mode 100644
index 0000000..346bef9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+
+import java.util.Collection;
+
+/**
+ * The iterative data stream represents the start of an iteration in a {@link DataStream}.
+ * 
+ * @param <T> Type of the elements in this Stream
+ */
+public class IterativeStream<T> extends SingleOutputStreamOperator<T, IterativeStream<T>> {
+
+	// We store these so that we can create a co-iteration if we need to
+	private DataStream<T> originalInput;
+	private long maxWaitTime;
+	
+	protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
+		super(dataStream.getExecutionEnvironment(),
+				new FeedbackTransformation<T>(dataStream.getTransformation(), maxWaitTime));
+		this.originalInput = dataStream;
+		this.maxWaitTime = maxWaitTime;
+		setBufferTimeout(dataStream.environment.getBufferTimeout());
+	}
+
+	/**
+	 * Closes the iteration. This method defines the end of the iterative
+	 * program part that will be fed back to the start of the iteration.
+	 *
+	 * <p>
+	 * A common usage pattern for streaming iterations is to use output
+	 * splitting to send a part of the closing data stream to the head. Refer to
+	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
+	 * for more information.
+	 * 
+	 * @param feedbackStream
+	 *            {@link DataStream} that will be used as input to the iteration
+	 *            head.
+	 *
+	 * @return The feedback stream.
+	 * 
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public DataStream<T> closeWith(DataStream<T> feedbackStream) {
+
+		Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
+
+		if (!predecessors.contains(this.transformation)) {
+			throw new UnsupportedOperationException(
+					"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
+		}
+
+		((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
+
+		return feedbackStream;
+	}
+
+	/**
+	 * Changes the feedback type of the iteration and allows the user to apply
+	 * co-transformations on the input and feedback stream, as in a
+	 * {@link ConnectedStreams}.
+	 *
+	 * <p>
+	 * For type safety the user needs to define the feedback type
+	 * 
+	 * @param feedbackTypeString
+	 *            String describing the type information of the feedback stream.
+	 * @return A {@link ConnectedIterativeStreams}.
+	 */
+	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(String feedbackTypeString) {
+		return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
+	}
+
+	/**
+	 * Changes the feedback type of the iteration and allows the user to apply
+	 * co-transformations on the input and feedback stream, as in a
+	 * {@link ConnectedStreams}.
+	 *
+	 * <p>
+	 * For type safety the user needs to define the feedback type
+	 * 
+	 * @param feedbackTypeClass
+	 *            Class of the elements in the feedback stream.
+	 * @return A {@link ConnectedIterativeStreams}.
+	 */
+	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
+		return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
+	}
+
+	/**
+	 * Changes the feedback type of the iteration and allows the user to apply
+	 * co-transformations on the input and feedback stream, as in a
+	 * {@link ConnectedStreams}.
+	 *
+	 * <p>
+	 * For type safety the user needs to define the feedback type
+	 * 
+	 * @param feedbackType
+	 *            The type information of the feedback stream.
+	 * @return A {@link ConnectedIterativeStreams}.
+	 */
+	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
+		return new ConnectedIterativeStreams<T, F>(originalInput, feedbackType, maxWaitTime);
+	}
+	
+	/**
+	 * The {@link ConnectedIterativeStreams} represent a start of an
+	 * iterative part of a streaming program, where the original input of the
+	 * iteration and the feedback of the iteration are connected as in a
+	 * {@link ConnectedStreams}.
+	 *
+	 * <p>
+	 * The user can distinguish between the two inputs using co-transformation,
+	 * thus eliminating the need for mapping the inputs and outputs to a common
+	 * type.
+	 * 
+	 * @param <I>
+	 *            Type of the input of the iteration
+	 * @param <F>
+	 *            Type of the feedback of the iteration
+	 */
+	public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {
+
+		private CoFeedbackTransformation<F> coFeedbackTransformation;
+
+		public ConnectedIterativeStreams(DataStream<I> input,
+				TypeInformation<F> feedbackType,
+				long waitTime) {
+			super(input.getExecutionEnvironment(),
+					input,
+					new DataStream<F>(input.getExecutionEnvironment(),
+							new CoFeedbackTransformation<F>(input.getParallelism(),
+									feedbackType,
+									waitTime)));
+			this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
+		}
+
+		/**
+		 * Closes the iteration. This method defines the end of the iterative
+		 * program part that will be fed back to the start of the iteration as
+		 * the second input in the {@link ConnectedStreams}.
+		 * 
+		 * @param feedbackStream
+		 *            {@link DataStream} that will be used as second input to
+		 *            the iteration head.
+		 * @return The feedback stream.
+		 * 
+		 */
+		@SuppressWarnings({ "rawtypes", "unchecked" })
+		public DataStream<F> closeWith(DataStream<F> feedbackStream) {
+
+			Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
+
+			if (!predecessors.contains(this.coFeedbackTransformation)) {
+				throw new UnsupportedOperationException(
+						"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
+			}
+
+			coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
+
+			return feedbackStream;
+		}
+		
+		private UnsupportedOperationException groupingException = new UnsupportedOperationException(
+				"Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
+		
+		@Override
+		public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+
+		@Override
+		public ConnectedStreams<I, F> keyBy(String field1, String field2) {throw groupingException;}
+
+		@Override
+		public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {throw groupingException;}
+
+		@Override
+		public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
+
+		@Override
+		public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
+		
+		@Override
+		public ConnectedStreams<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+		
+		@Override
+		public ConnectedStreams<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
+		
+		@Override
+		public ConnectedStreams<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
+		
+		@Override
+		public ConnectedStreams<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 43371b7..7bdebf8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStreams;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -67,7 +67,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Integer> source = env.fromElements(1, 10);
 
-		IterativeDataStream<Integer> iter1 = source.iterate();
+		IterativeStream<Integer> iter1 = source.iterate();
 		SingleOutputStreamOperator<Integer, ?> map1 = iter1.map(NoOpIntMap);
 		iter1.closeWith(map1).print();
 	}
@@ -80,7 +80,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		// introduce dummy mapper to get to correct parallelism
 		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
 
-		IterativeDataStream<Integer> iter1 = source.iterate();
+		IterativeStream<Integer> iter1 = source.iterate();
 
 		iter1.closeWith(iter1.map(NoOpIntMap));
 		iter1.closeWith(iter1.map(NoOpIntMap));
@@ -96,7 +96,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Integer> source = env.fromElements(1, 10)
 				.map(NoOpIntMap);
 
-		IterativeDataStream<Integer> iter1 = source.iterate();
+		IterativeStream<Integer> iter1 = source.iterate();
 
 
 		iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2));
@@ -112,7 +112,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		// introduce dummy mapper to get to correct parallelism
 		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
 
-		ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
+		ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
 				Integer.class);
 
 
@@ -131,8 +131,8 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		// introduce dummy mapper to get to correct parallelism
 		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
 
-		IterativeDataStream<Integer> iter1 = source.iterate();
-		IterativeDataStream<Integer> iter2 = source.iterate();
+		IterativeStream<Integer> iter1 = source.iterate();
+		IterativeStream<Integer> iter2 = source.iterate();
 
 
 		iter2.closeWith(iter1.map(NoOpIntMap));
@@ -150,8 +150,8 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		// introduce dummy mapper to get to correct parallelism
 		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
 
-		IterativeDataStream<Integer> iter1 = source.iterate();
-		ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
+		IterativeStream<Integer> iter1 = source.iterate();
+		ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
 				Integer.class);
 
 
@@ -166,7 +166,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
 
-		IterativeDataStream<Integer> iter1 = source.iterate();
+		IterativeStream<Integer> iter1 = source.iterate();
 
 		iter1.map(NoOpIntMap).print();
 
@@ -179,9 +179,9 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance
 
-		IterativeDataStream<Integer> iter1 = source.iterate();
+		IterativeStream<Integer> iter1 = source.iterate();
 		// Calling withFeedbackType should create a new iteration
-		ConnectedIterativeDataStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
+		ConnectedIterativeStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
 
 		iter1.closeWith(iter1.map(NoOpIntMap)).print();
 		iter2.closeWith(iter2.map(NoOpCoMap)).print();
@@ -205,7 +205,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
 				.map(NoOpIntMap).name("ParallelizeMapRebalance");
 
-		IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
+		IterativeStream<Integer> iter1 = source1.union(source2).iterate();
 
 		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2);
 		DataStream<Integer> head2 = iter1.map(NoOpIntMap).name("IterForwardMap");
@@ -286,7 +286,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
 				.map(NoOpIntMap);
 
-		IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
+		IterativeStream<Integer> iter1 = source1.union(source2).iterate();
 
 		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
 		DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).rebalance().name(
@@ -370,7 +370,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
 				.map(NoOpBoolMap).name("ParallelizeMap");
 
-		IterativeDataStream<Boolean> iteration = source.iterate(3000);
+		IterativeStream<Boolean> iteration = source.iterate(3000);
 
 		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
 
@@ -395,7 +395,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 				.map(NoOpStrMap).name("ParallelizeMap");
 
 
-		ConnectedIterativeDataStreams<Integer, String> coIt = env.fromElements(0, 0)
+		ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0)
 				.map(NoOpIntMap).name("ParallelizeMap")
 				.iterate(2000)
 				.withFeedbackType("String");
@@ -476,7 +476,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Integer> source = env.fromElements(1, 2, 3)
 				.map(NoOpIntMap).name("ParallelizeMap");
 
-		IterativeDataStream<Integer> it = source.keyBy(key).iterate(3000);
+		IterativeStream<Integer> it = source.keyBy(key).iterate(3000);
 
 		DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
 
@@ -518,7 +518,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 				.map(NoOpBoolMap).name("ParallelizeMap");
 
 
-		IterativeDataStream<Boolean> iteration = source.iterate(3000);
+		IterativeStream<Boolean> iteration = source.iterate(3000);
 
 		iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 6c439b9..d35c9bd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -115,7 +115,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
 
-		IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
+		IterativeStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
 
 					Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(
 							0L, new Tuple2<String, Long>("", 0L));

http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 8860e58..af19af7 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -69,7 +69,7 @@ public class IterateExample {
 		}
 
 		// create an iterative data stream from the input with 5 second timeout
-		IterativeDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
+		IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
 				.iterate(5000);
 
 		// apply the step function to get the next Fibonacci number