You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/29 15:46:08 UTC
[4/6] flink git commit: [FLINK-1687] [streaming] [api-breaking]
fromCollection and generateSequence rework
[FLINK-1687] [streaming] [api-breaking] fromCollection and generateSequence rework
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/120bd0f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/120bd0f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/120bd0f4
Branch: refs/heads/master
Commit: 120bd0f428fd4ca4051d052ee9a79728d55e72d7
Parents: 1c3b67e
Author: mbalassi <mb...@apache.org>
Authored: Sat May 23 22:34:06 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200
----------------------------------------------------------------------
.../environment/StreamExecutionEnvironment.java | 28 +++--
.../functions/source/FromIteratorFunction.java | 3 -
.../source/FromSplittableIteratorFunction.java | 52 +++++++++
.../functions/source/GenSequenceFunction.java | 55 ---------
.../api/StreamExecutionEnvironmentTest.java | 113 +++++++++++++++++++
.../api/collector/DirectedOutputTest.java | 2 +-
.../api/complex/ComplexIntegrationTest.java | 6 +-
.../streaming/api/operators/ProjectTest.java | 2 +-
.../api/streamtask/StreamVertexTest.java | 2 +-
9 files changed, 188 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index a1fe60d..09a8788 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
-import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -418,7 +418,7 @@ public abstract class StreamExecutionEnvironment {
if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end");
}
- return addSource(new GenSequenceFunction(from, to), "Sequence Source");
+ return fromCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
}
/**
@@ -432,8 +432,8 @@ public abstract class StreamExecutionEnvironment {
* @return A data stream, containing all number in the [from, to] interval
*/
public DataStreamSource<Long> generateParallelSequence(long from, long to) {
- return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parellel " +
- "Sequence source");
+ return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parallel " +
+ "Sequence Source");
}
/**
@@ -456,7 +456,7 @@ public abstract class StreamExecutionEnvironment {
* The type of the returned data stream
* @return The data stream representing the given array of elements
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
+ public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException(
"fromElements needs at least one element as argument");
@@ -489,7 +489,7 @@ public abstract class StreamExecutionEnvironment {
* The type of the returned data stream
* @return The data stream representing the given collection
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
+ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
throw new IllegalArgumentException("Collection must not be empty");
@@ -518,7 +518,7 @@ public abstract class StreamExecutionEnvironment {
* The type of the returned data stream
* @return The data stream representing the given collection
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
+ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
typeInfo) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
@@ -552,7 +552,7 @@ public abstract class StreamExecutionEnvironment {
* @return The data stream representing the elements in the iterator
* @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
+ public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
return fromCollection(data, TypeExtractor.getForClass(type));
}
@@ -578,7 +578,7 @@ public abstract class StreamExecutionEnvironment {
* The type of the returned data stream
* @return The data stream representing the elements in the iterator
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
+ public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
typeInfo) {
Preconditions.checkNotNull(data, "The iterator must not be null");
if (!(data instanceof Serializable)) {
@@ -589,6 +589,12 @@ public abstract class StreamExecutionEnvironment {
return addSource(function, "Collection Source").returns(typeInfo);
}
+ // private helper for passing different names
+ private <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> iterator, TypeInformation<OUT>
+ typeInfo, String operatorName) {
+ return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+ }
+
/**
* Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
* framework to create a parallel data stream source that returns the elements in the iterator. The iterator
@@ -636,13 +642,13 @@ public abstract class StreamExecutionEnvironment {
*/
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
typeInfo) {
- return fromParallelCollection(iterator, typeInfo, "Parallel Collection source");
+ return fromParallelCollection(iterator, typeInfo, "Parallel Collection Source");
}
// private helper for passing different names
private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
typeInfo, String operatorName) {
- return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+ return addSource(new FromSplittableIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index d46b1f2..125b88b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -17,9 +17,6 @@
package org.apache.flink.streaming.api.functions.source;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
import java.util.Iterator;
public class FromIteratorFunction<T> implements SourceFunction<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
new file mode 100644
index 0000000..fd86858
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.SplittableIterator;
+
+import java.util.Iterator;
+
+public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ SplittableIterator<T> fullIterator;
+ Iterator<T> iterator;
+
+ public FromSplittableIteratorFunction(SplittableIterator<T> iterator) {
+ this.fullIterator = iterator;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
+ iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask];
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return !iterator.hasNext();
+ }
+
+ @Override
+ public T next() throws Exception {
+ return iterator.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
deleted file mode 100644
index 7d302d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.NumberSequenceIterator;
-
-/**
- * Source Function used to generate the number sequence
- *
- */
-public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
-
- private static final long serialVersionUID = 1L;
-
- private NumberSequenceIterator fullIterator;
- private NumberSequenceIterator splitIterator;
-
- public GenSequenceFunction(long from, long to) {
- fullIterator = new NumberSequenceIterator(from, to);
- }
-
- @Override
- public void open(Configuration config) {
- int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
- int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
- splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
- }
-
- @Override
- public boolean reachedEnd() throws Exception {
- return !splitIterator.hasNext();
- }
-
- @Override
- public Long next() throws Exception {
- return splitIterator.next();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..cbbd409
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+
+public class StreamExecutionEnvironmentTest {
+
+ private static final long MEMORYSIZE = 32;
+ private static int PARALLELISM = 4;
+
+ @Test
+ public void testFromCollectionParallelism() {
+ TypeInformation<Object> typeInfo = TypeExtractor.getForClass(Object.class);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ boolean seenExpectedException = false;
+
+ try {
+ DataStream<Object> dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo)
+ .setParallelism(4);
+ } catch (IllegalArgumentException e) {
+ seenExpectedException = true;
+ }
+
+ DataStream<Object> dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), typeInfo)
+ .setParallelism(4);
+
+ String plan = env.getExecutionPlan();
+
+ assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
+ assertTrue("Parallelism for dataStream1 is not right.",
+ plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
+ assertTrue("Parallelism for dataStream2 is not right.",
+ plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
+ }
+
+ @Test
+ public void testGenerateSequenceParallelism() throws Exception {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ boolean seenExpectedException = false;
+
+ try {
+ DataStream<Long> dataStream1 = env.generateSequence(0,0).setParallelism(4);
+ } catch (IllegalArgumentException e) {
+ seenExpectedException = true;
+ }
+
+ DataStream<Long> dataStream2 = env.generateParallelSequence(0,0).setParallelism(4);
+
+ String plan = env.getExecutionPlan();
+
+ assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
+ assertTrue("Parallelism for dataStream1 is not right.",
+ plan.contains("\"contents\":\"Sequence Source\",\"parallelism\":1"));
+ assertTrue("Parallelism for dataStream2 is not right.",
+ plan.contains("\"contents\":\"Parallel Sequence Source\",\"parallelism\":4"));
+ }
+
+ public static class DummySplittableIterator extends SplittableIterator {
+ private static final long serialVersionUID = 1312752876092210499L;
+
+ @Override
+ public Iterator[] split(int numPartitions) {
+ return new Iterator[0];
+ }
+
+ @Override
+ public int getMaximumNumberOfSplits() {
+ return 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public Object next() {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index fc3e36f..56b6ae8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -101,7 +101,7 @@ public class DirectedOutputTest {
TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
TestListResultSink<Long> allSink = new TestListResultSink<Long>();
- SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
+ SplitDataStream<Long> source = env.generateParallelSequence(1, 11).split(new MyOutputSelector());
source.select(EVEN).addSink(evenSink);
source.select(ODD, TEN).addSink(oddAndTenSink);
source.select(EVEN, ODD).addSink(evenAndOddSink);
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 908671d..d321851 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -222,8 +222,8 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
env.setBufferTimeout(0);
- DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
- DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
+ DataStream<Long> sourceStream31 = env.generateParallelSequence(1, 10000);
+ DataStream<Long> sourceStream32 = env.generateParallelSequence(10001, 20000);
sourceStream31.filter(new PrimeFilterFunction())
.window(Count.of(100))
@@ -308,7 +308,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
env.setBufferTimeout(0);
- DataStream<Long> dataStream51 = env.generateSequence(1, 5)
+ DataStream<Long> dataStream51 = env.generateParallelSequence(1, 5)
.map(new MapFunction<Long, Long>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index d9cc607..c09afee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -92,7 +92,7 @@ public class ProjectTest implements Serializable {
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
- env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
+ env.generateParallelSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
@Override
public Tuple3<Long, Character, Double> map(Long value) throws Exception {
return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index 0bb1848..90cb7b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -157,7 +157,7 @@ public class StreamVertexTest {
StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
- DataStream<Long> generatedSequence = env.generateSequence(0, 3);
+ DataStream<Long> generatedSequence = env.generateParallelSequence(0, 3);
fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());