You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:46 UTC
[12/13] flink git commit: [FLINK-2550] Rename IterativeDataStream to
IterativeStream
[FLINK-2550] Rename IterativeDataStream to IterativeStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b6e762f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b6e762f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b6e762f
Branch: refs/heads/master
Commit: 7b6e762fda09e5edb1d5cca2398cb8035d6794b4
Parents: 0de9d2e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 5 11:42:53 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 28 +--
.../api/datastream/IterativeDataStream.java | 218 -------------------
.../api/datastream/IterativeStream.java | 218 +++++++++++++++++++
.../apache/flink/streaming/api/IterateTest.java | 38 ++--
.../api/complex/ComplexIntegrationTest.java | 4 +-
.../examples/iteration/IterateExample.java | 4 +-
6 files changed, 255 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 32d9012..003ef36 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -467,14 +467,14 @@ public class DataStream<T> {
/**
* Initiates an iterative part of the program that feeds back data streams.
* The iterative part needs to be closed by calling
- * {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
- * this IterativeDataStream will be the iteration head. The data stream
- * given to the {@link IterativeDataStream#closeWith(DataStream)} method is
+ * {@link IterativeStream#closeWith(DataStream)}. The transformation of
+ * this IterativeStream will be the iteration head. The data stream
+ * given to the {@link IterativeStream#closeWith(DataStream)} method is
* the data stream that will be fed back and used as the input for the
* iteration head. The user can also use different feedback type than the
* input of the iteration and treat the input and feedback streams as a
* {@link ConnectedStreams} be calling
- * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
+ * {@link IterativeStream#withFeedbackType(TypeInformation)}
* <p>
* A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
@@ -482,7 +482,7 @@ public class DataStream<T> {
* <p>
* The iteration edge will be partitioned the same way as the first input of
* the iteration head unless it is changed in the
- * {@link IterativeDataStream#closeWith(DataStream)} call.
+ * {@link IterativeStream#closeWith(DataStream)} call.
* <p>
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the
@@ -491,21 +491,21 @@ public class DataStream<T> {
*
* @return The iterative data stream created.
*/
- public IterativeDataStream<T> iterate() {
- return new IterativeDataStream<T>(this, 0);
+ public IterativeStream<T> iterate() {
+ return new IterativeStream<T>(this, 0);
}
/**
* Initiates an iterative part of the program that feeds back data streams.
* The iterative part needs to be closed by calling
- * {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
- * this IterativeDataStream will be the iteration head. The data stream
- * given to the {@link IterativeDataStream#closeWith(DataStream)} method is
+ * {@link IterativeStream#closeWith(DataStream)}. The transformation of
+ * this IterativeStream will be the iteration head. The data stream
+ * given to the {@link IterativeStream#closeWith(DataStream)} method is
* the data stream that will be fed back and used as the input for the
* iteration head. The user can also use different feedback type than the
* input of the iteration and treat the input and feedback streams as a
* {@link ConnectedStreams} be calling
- * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
+ * {@link IterativeStream#withFeedbackType(TypeInformation)}
* <p>
* A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
@@ -513,7 +513,7 @@ public class DataStream<T> {
* <p>
* The iteration edge will be partitioned the same way as the first input of
* the iteration head unless it is changed in the
- * {@link IterativeDataStream#closeWith(DataStream)} call.
+ * {@link IterativeStream#closeWith(DataStream)} call.
* <p>
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the
@@ -526,8 +526,8 @@ public class DataStream<T> {
*
* @return The iterative data stream created.
*/
- public IterativeDataStream<T> iterate(long maxWaitTimeMillis) {
- return new IterativeDataStream<T>(this, maxWaitTimeMillis);
+ public IterativeStream<T> iterate(long maxWaitTimeMillis) {
+ return new IterativeStream<T>(this, maxWaitTimeMillis);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
deleted file mode 100644
index 75216ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-
-import java.util.Collection;
-
-/**
- * The iterative data stream represents the start of an iteration in a {@link DataStream}.
- *
- * @param <T> Type of the elements in this Stream
- */
-public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, IterativeDataStream<T>> {
-
- // We store these so that we can create a co-iteration if we need to
- private DataStream<T> originalInput;
- private long maxWaitTime;
-
- protected IterativeDataStream(DataStream<T> dataStream, long maxWaitTime) {
- super(dataStream.getExecutionEnvironment(),
- new FeedbackTransformation<T>(dataStream.getTransformation(), maxWaitTime));
- this.originalInput = dataStream;
- this.maxWaitTime = maxWaitTime;
- setBufferTimeout(dataStream.environment.getBufferTimeout());
- }
-
- /**
- * Closes the iteration. This method defines the end of the iterative
- * program part that will be fed back to the start of the iteration.
- *
- * <p>
- * A common usage pattern for streaming iterations is to use output
- * splitting to send a part of the closing data stream to the head. Refer to
- * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
- * for more information.
- *
- * @param feedbackStream
- * {@link DataStream} that will be used as input to the iteration
- * head.
- *
- * @return The feedback stream.
- *
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public DataStream<T> closeWith(DataStream<T> feedbackStream) {
-
- Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
-
- if (!predecessors.contains(this.transformation)) {
- throw new UnsupportedOperationException(
- "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
- }
-
- ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
-
- return feedbackStream;
- }
-
- /**
- * Changes the feedback type of the iteration and allows the user to apply
- * co-transformations on the input and feedback stream, as in a
- * {@link ConnectedStreams}.
- *
- * <p>
- * For type safety the user needs to define the feedback type
- *
- * @param feedbackTypeString
- * String describing the type information of the feedback stream.
- * @return A {@link ConnectedIterativeDataStreams}.
- */
- public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackTypeString) {
- return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
- }
-
- /**
- * Changes the feedback type of the iteration and allows the user to apply
- * co-transformations on the input and feedback stream, as in a
- * {@link ConnectedStreams}.
- *
- * <p>
- * For type safety the user needs to define the feedback type
- *
- * @param feedbackTypeClass
- * Class of the elements in the feedback stream.
- * @return A {@link ConnectedIterativeDataStreams}.
- */
- public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
- return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
- }
-
- /**
- * Changes the feedback type of the iteration and allows the user to apply
- * co-transformations on the input and feedback stream, as in a
- * {@link ConnectedStreams}.
- *
- * <p>
- * For type safety the user needs to define the feedback type
- *
- * @param feedbackType
- * The type information of the feedback stream.
- * @return A {@link ConnectedIterativeDataStreams}.
- */
- public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
- return new ConnectedIterativeDataStreams<T, F>(originalInput, feedbackType, maxWaitTime);
- }
-
- /**
- * The {@link ConnectedIterativeDataStreams} represent a start of an
- * iterative part of a streaming program, where the original input of the
- * iteration and the feedback of the iteration are connected as in a
- * {@link ConnectedStreams}.
- *
- * <p>
- * The user can distinguish between the two inputs using co-transformation,
- * thus eliminating the need for mapping the inputs and outputs to a common
- * type.
- *
- * @param <I>
- * Type of the input of the iteration
- * @param <F>
- * Type of the feedback of the iteration
- */
- public static class ConnectedIterativeDataStreams<I, F> extends ConnectedStreams<I, F> {
-
- private CoFeedbackTransformation<F> coFeedbackTransformation;
-
- public ConnectedIterativeDataStreams(DataStream<I> input,
- TypeInformation<F> feedbackType,
- long waitTime) {
- super(input.getExecutionEnvironment(),
- input,
- new DataStream<F>(input.getExecutionEnvironment(),
- new CoFeedbackTransformation<F>(input.getParallelism(),
- feedbackType,
- waitTime)));
- this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
- }
-
- /**
- * Closes the iteration. This method defines the end of the iterative
- * program part that will be fed back to the start of the iteration as
- * the second input in the {@link ConnectedStreams}.
- *
- * @param feedbackStream
- * {@link DataStream} that will be used as second input to
- * the iteration head.
- * @return The feedback stream.
- *
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public DataStream<F> closeWith(DataStream<F> feedbackStream) {
-
- Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
-
- if (!predecessors.contains(this.coFeedbackTransformation)) {
- throw new UnsupportedOperationException(
- "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
- }
-
- coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
-
- return feedbackStream;
- }
-
- private UnsupportedOperationException groupingException = new UnsupportedOperationException(
- "Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
-
- @Override
- public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> keyBy(String field1, String field2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
new file mode 100644
index 0000000..346bef9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+
+import java.util.Collection;
+
+/**
+ * The iterative data stream represents the start of an iteration in a {@link DataStream}.
+ *
+ * @param <T> Type of the elements in this Stream
+ */
+public class IterativeStream<T> extends SingleOutputStreamOperator<T, IterativeStream<T>> {
+
+ // We store these so that we can create a co-iteration if we need to
+ private DataStream<T> originalInput;
+ private long maxWaitTime;
+
+ protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
+ super(dataStream.getExecutionEnvironment(),
+ new FeedbackTransformation<T>(dataStream.getTransformation(), maxWaitTime));
+ this.originalInput = dataStream;
+ this.maxWaitTime = maxWaitTime;
+ setBufferTimeout(dataStream.environment.getBufferTimeout());
+ }
+
+ /**
+ * Closes the iteration. This method defines the end of the iterative
+ * program part that will be fed back to the start of the iteration.
+ *
+ * <p>
+ * A common usage pattern for streaming iterations is to use output
+ * splitting to send a part of the closing data stream to the head. Refer to
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
+ * for more information.
+ *
+ * @param feedbackStream
+ * {@link DataStream} that will be used as input to the iteration
+ * head.
+ *
+ * @return The feedback stream.
+ *
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public DataStream<T> closeWith(DataStream<T> feedbackStream) {
+
+ Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
+
+ if (!predecessors.contains(this.transformation)) {
+ throw new UnsupportedOperationException(
+ "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
+ }
+
+ ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
+
+ return feedbackStream;
+ }
+
+ /**
+ * Changes the feedback type of the iteration and allows the user to apply
+ * co-transformations on the input and feedback stream, as in a
+ * {@link ConnectedStreams}.
+ *
+ * <p>
+ * For type safety the user needs to define the feedback type
+ *
+ * @param feedbackTypeString
+ * String describing the type information of the feedback stream.
+ * @return A {@link ConnectedIterativeStreams}.
+ */
+ public <F> ConnectedIterativeStreams<T, F> withFeedbackType(String feedbackTypeString) {
+ return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
+ }
+
+ /**
+ * Changes the feedback type of the iteration and allows the user to apply
+ * co-transformations on the input and feedback stream, as in a
+ * {@link ConnectedStreams}.
+ *
+ * <p>
+ * For type safety the user needs to define the feedback type
+ *
+ * @param feedbackTypeClass
+ * Class of the elements in the feedback stream.
+ * @return A {@link ConnectedIterativeStreams}.
+ */
+ public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
+ return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
+ }
+
+ /**
+ * Changes the feedback type of the iteration and allows the user to apply
+ * co-transformations on the input and feedback stream, as in a
+ * {@link ConnectedStreams}.
+ *
+ * <p>
+ * For type safety the user needs to define the feedback type
+ *
+ * @param feedbackType
+ * The type information of the feedback stream.
+ * @return A {@link ConnectedIterativeStreams}.
+ */
+ public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
+ return new ConnectedIterativeStreams<T, F>(originalInput, feedbackType, maxWaitTime);
+ }
+
+ /**
+ * The {@link ConnectedIterativeStreams} represent a start of an
+ * iterative part of a streaming program, where the original input of the
+ * iteration and the feedback of the iteration are connected as in a
+ * {@link ConnectedStreams}.
+ *
+ * <p>
+ * The user can distinguish between the two inputs using co-transformation,
+ * thus eliminating the need for mapping the inputs and outputs to a common
+ * type.
+ *
+ * @param <I>
+ * Type of the input of the iteration
+ * @param <F>
+ * Type of the feedback of the iteration
+ */
+ public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {
+
+ private CoFeedbackTransformation<F> coFeedbackTransformation;
+
+ public ConnectedIterativeStreams(DataStream<I> input,
+ TypeInformation<F> feedbackType,
+ long waitTime) {
+ super(input.getExecutionEnvironment(),
+ input,
+ new DataStream<F>(input.getExecutionEnvironment(),
+ new CoFeedbackTransformation<F>(input.getParallelism(),
+ feedbackType,
+ waitTime)));
+ this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
+ }
+
+ /**
+ * Closes the iteration. This method defines the end of the iterative
+ * program part that will be fed back to the start of the iteration as
+ * the second input in the {@link ConnectedStreams}.
+ *
+ * @param feedbackStream
+ * {@link DataStream} that will be used as second input to
+ * the iteration head.
+ * @return The feedback stream.
+ *
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public DataStream<F> closeWith(DataStream<F> feedbackStream) {
+
+ Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
+
+ if (!predecessors.contains(this.coFeedbackTransformation)) {
+ throw new UnsupportedOperationException(
+ "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
+ }
+
+ coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
+
+ return feedbackStream;
+ }
+
+ private UnsupportedOperationException groupingException = new UnsupportedOperationException(
+ "Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
+
+ @Override
+ public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> keyBy(String field1, String field2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 43371b7..7bdebf8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStreams;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -67,7 +67,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source = env.fromElements(1, 10);
- IterativeDataStream<Integer> iter1 = source.iterate();
+ IterativeStream<Integer> iter1 = source.iterate();
SingleOutputStreamOperator<Integer, ?> map1 = iter1.map(NoOpIntMap);
iter1.closeWith(map1).print();
}
@@ -80,7 +80,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
- IterativeDataStream<Integer> iter1 = source.iterate();
+ IterativeStream<Integer> iter1 = source.iterate();
iter1.closeWith(iter1.map(NoOpIntMap));
iter1.closeWith(iter1.map(NoOpIntMap));
@@ -96,7 +96,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source = env.fromElements(1, 10)
.map(NoOpIntMap);
- IterativeDataStream<Integer> iter1 = source.iterate();
+ IterativeStream<Integer> iter1 = source.iterate();
iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2));
@@ -112,7 +112,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
- ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
+ ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
Integer.class);
@@ -131,8 +131,8 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
- IterativeDataStream<Integer> iter1 = source.iterate();
- IterativeDataStream<Integer> iter2 = source.iterate();
+ IterativeStream<Integer> iter1 = source.iterate();
+ IterativeStream<Integer> iter2 = source.iterate();
iter2.closeWith(iter1.map(NoOpIntMap));
@@ -150,8 +150,8 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
- IterativeDataStream<Integer> iter1 = source.iterate();
- ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
+ IterativeStream<Integer> iter1 = source.iterate();
+ ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
Integer.class);
@@ -166,7 +166,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
- IterativeDataStream<Integer> iter1 = source.iterate();
+ IterativeStream<Integer> iter1 = source.iterate();
iter1.map(NoOpIntMap).print();
@@ -179,9 +179,9 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance
- IterativeDataStream<Integer> iter1 = source.iterate();
+ IterativeStream<Integer> iter1 = source.iterate();
// Calling withFeedbackType should create a new iteration
- ConnectedIterativeDataStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
+ ConnectedIterativeStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
iter1.closeWith(iter1.map(NoOpIntMap)).print();
iter2.closeWith(iter2.map(NoOpCoMap)).print();
@@ -205,7 +205,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
.map(NoOpIntMap).name("ParallelizeMapRebalance");
- IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
+ IterativeStream<Integer> iter1 = source1.union(source2).iterate();
DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2);
DataStream<Integer> head2 = iter1.map(NoOpIntMap).name("IterForwardMap");
@@ -286,7 +286,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
.map(NoOpIntMap);
- IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
+ IterativeStream<Integer> iter1 = source1.union(source2).iterate();
DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).rebalance().name(
@@ -370,7 +370,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
.map(NoOpBoolMap).name("ParallelizeMap");
- IterativeDataStream<Boolean> iteration = source.iterate(3000);
+ IterativeStream<Boolean> iteration = source.iterate(3000);
DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
@@ -395,7 +395,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
.map(NoOpStrMap).name("ParallelizeMap");
- ConnectedIterativeDataStreams<Integer, String> coIt = env.fromElements(0, 0)
+ ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0)
.map(NoOpIntMap).name("ParallelizeMap")
.iterate(2000)
.withFeedbackType("String");
@@ -476,7 +476,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source = env.fromElements(1, 2, 3)
.map(NoOpIntMap).name("ParallelizeMap");
- IterativeDataStream<Integer> it = source.keyBy(key).iterate(3000);
+ IterativeStream<Integer> it = source.keyBy(key).iterate(3000);
DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
@@ -518,7 +518,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
.map(NoOpBoolMap).name("ParallelizeMap");
- IterativeDataStream<Boolean> iteration = source.iterate(3000);
+ IterativeStream<Boolean> iteration = source.iterate(3000);
iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 6c439b9..d35c9bd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -115,7 +115,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
- IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
+ IterativeStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(
0L, new Tuple2<String, Long>("", 0L));
http://git-wip-us.apache.org/repos/asf/flink/blob/7b6e762f/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 8860e58..af19af7 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -69,7 +69,7 @@ public class IterateExample {
}
// create an iterative data stream from the input with 5 second timeout
- IterativeDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
+ IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
.iterate(5000);
// apply the step function to get the next Fibonacci number