You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/29 15:46:08 UTC

[4/6] flink git commit: [FLINK-1687] [streaming] [api-breaking] fromCollection and generateSequence rework

[FLINK-1687] [streaming] [api-breaking] fromCollection and generateSequence rework


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/120bd0f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/120bd0f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/120bd0f4

Branch: refs/heads/master
Commit: 120bd0f428fd4ca4051d052ee9a79728d55e72d7
Parents: 1c3b67e
Author: mbalassi <mb...@apache.org>
Authored: Sat May 23 22:34:06 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  28 +++--
 .../functions/source/FromIteratorFunction.java  |   3 -
 .../source/FromSplittableIteratorFunction.java  |  52 +++++++++
 .../functions/source/GenSequenceFunction.java   |  55 ---------
 .../api/StreamExecutionEnvironmentTest.java     | 113 +++++++++++++++++++
 .../api/collector/DirectedOutputTest.java       |   2 +-
 .../api/complex/ComplexIntegrationTest.java     |   6 +-
 .../streaming/api/operators/ProjectTest.java    |   2 +-
 .../api/streamtask/StreamVertexTest.java        |   2 +-
 9 files changed, 188 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index a1fe60d..09a8788 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
-import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -418,7 +418,7 @@ public abstract class StreamExecutionEnvironment {
 		if (from > to) {
 			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
 		}
-		return addSource(new GenSequenceFunction(from, to), "Sequence Source");
+		return fromCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
 	}
 
 	/**
@@ -432,8 +432,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A data stream, containing all number in the [from, to] interval
 	 */
 	public DataStreamSource<Long> generateParallelSequence(long from, long to) {
-		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parellel " +
-				"Sequence source");
+		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parallel " +
+				"Sequence Source");
 	}
 
 	/**
@@ -456,7 +456,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given array of elements
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
+	public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
 		if (data.length == 0) {
 			throw new IllegalArgumentException(
 					"fromElements needs at least one element as argument");
@@ -489,7 +489,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given collection
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
+	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
 		Preconditions.checkNotNull(data, "Collection must not be null");
 		if (data.isEmpty()) {
 			throw new IllegalArgumentException("Collection must not be empty");
@@ -518,7 +518,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given collection
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
+	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
 			typeInfo) {
 		Preconditions.checkNotNull(data, "Collection must not be null");
 		if (data.isEmpty()) {
@@ -552,7 +552,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The data stream representing the elements in the iterator
 	 * @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
+	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
 		return fromCollection(data, TypeExtractor.getForClass(type));
 	}
 
@@ -578,7 +578,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the elements in the iterator
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
+	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
 			typeInfo) {
 		Preconditions.checkNotNull(data, "The iterator must not be null");
 		if (!(data instanceof Serializable)) {
@@ -589,6 +589,12 @@ public abstract class StreamExecutionEnvironment {
 		return addSource(function, "Collection Source").returns(typeInfo);
 	}
 
+	// private helper for passing different names
+	private <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> iterator, TypeInformation<OUT>
+			typeInfo, String operatorName) {
+		return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+	}
+
 	/**
 	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
 	 * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
@@ -636,13 +642,13 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
 			typeInfo) {
-		return fromParallelCollection(iterator, typeInfo, "Parallel Collection source");
+		return fromParallelCollection(iterator, typeInfo, "Parallel Collection Source");
 	}
 
 	// private helper for passing different names
 	private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
 			typeInfo, String operatorName) {
-		return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+		return addSource(new FromSplittableIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index d46b1f2..125b88b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
 import java.util.Iterator;
 
 public class FromIteratorFunction<T> implements SourceFunction<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
new file mode 100644
index 0000000..fd86858
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.SplittableIterator;
+
+import java.util.Iterator;
+
+public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	SplittableIterator<T> fullIterator;
+	Iterator<T> iterator;
+
+	public FromSplittableIteratorFunction(SplittableIterator<T> iterator) {
+		this.fullIterator = iterator;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+		int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
+		iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask];
+	}
+
+	@Override
+	public boolean reachedEnd() throws Exception {
+		return !iterator.hasNext();
+	}
+
+	@Override
+	public T next() throws Exception {
+		return iterator.next();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
deleted file mode 100644
index 7d302d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.NumberSequenceIterator;
-
-/**
- * Source Function used to generate the number sequence
- * 
- */
-public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
-
-	private static final long serialVersionUID = 1L;
-
-	private NumberSequenceIterator fullIterator;
-	private NumberSequenceIterator splitIterator;
-
-	public GenSequenceFunction(long from, long to) {
-		fullIterator = new NumberSequenceIterator(from, to);
-	}
-
-	@Override
-	public void open(Configuration config) {
-		int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
-		int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
-		splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
-	}
-
-	@Override
-	public boolean reachedEnd() throws Exception {
-		return !splitIterator.hasNext();
-	}
-
-	@Override
-	public Long next() throws Exception {
-		return splitIterator.next();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..cbbd409
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+
+public class StreamExecutionEnvironmentTest {
+
+	private static final long MEMORYSIZE = 32;
+	private static int PARALLELISM = 4;
+
+	@Test
+	public void testFromCollectionParallelism() {
+		TypeInformation<Object> typeInfo = TypeExtractor.getForClass(Object.class);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		boolean seenExpectedException = false;
+
+		try {
+			DataStream<Object> dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo)
+					.setParallelism(4);
+		} catch (IllegalArgumentException e) {
+			seenExpectedException = true;
+		}
+
+		DataStream<Object> dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), typeInfo)
+				.setParallelism(4);
+
+		String plan = env.getExecutionPlan();
+
+		assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
+		assertTrue("Parallelism for dataStream1 is not right.",
+				plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
+		assertTrue("Parallelism for dataStream2 is not right.",
+				plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
+	}
+
+	@Test
+	public void testGenerateSequenceParallelism() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		boolean seenExpectedException = false;
+
+		try {
+			DataStream<Long> dataStream1 = env.generateSequence(0,0).setParallelism(4);
+		} catch (IllegalArgumentException e) {
+			seenExpectedException = true;
+		}
+
+		DataStream<Long> dataStream2 = env.generateParallelSequence(0,0).setParallelism(4);
+
+		String plan = env.getExecutionPlan();
+
+		assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
+		assertTrue("Parallelism for dataStream1 is not right.",
+				plan.contains("\"contents\":\"Sequence Source\",\"parallelism\":1"));
+		assertTrue("Parallelism for dataStream2 is not right.",
+				plan.contains("\"contents\":\"Parallel Sequence Source\",\"parallelism\":4"));
+	}
+
+	public static class DummySplittableIterator extends SplittableIterator {
+		private static final long serialVersionUID = 1312752876092210499L;
+
+		@Override
+		public Iterator[] split(int numPartitions) {
+			return new Iterator[0];
+		}
+
+		@Override
+		public int getMaximumNumberOfSplits() {
+			return 0;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return false;
+		}
+
+		@Override
+		public Object next() {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index fc3e36f..56b6ae8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -101,7 +101,7 @@ public class DirectedOutputTest {
 		TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
 		TestListResultSink<Long> allSink = new TestListResultSink<Long>();
 
-		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
+		SplitDataStream<Long> source = env.generateParallelSequence(1, 11).split(new MyOutputSelector());
 		source.select(EVEN).addSink(evenSink);
 		source.select(ODD, TEN).addSink(oddAndTenSink);
 		source.select(EVEN, ODD).addSink(evenAndOddSink);

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 908671d..d321851 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -222,8 +222,8 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		
 		env.setBufferTimeout(0);
 
-		DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
-		DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
+		DataStream<Long> sourceStream31 = env.generateParallelSequence(1, 10000);
+		DataStream<Long> sourceStream32 = env.generateParallelSequence(10001, 20000);
 
 		sourceStream31.filter(new PrimeFilterFunction())
 				.window(Count.of(100))
@@ -308,7 +308,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		
 		env.setBufferTimeout(0);
 
-		DataStream<Long> dataStream51 = env.generateSequence(1, 5)
+		DataStream<Long> dataStream51 = env.generateParallelSequence(1, 5)
 				.map(new MapFunction<Long, Long>() {
 
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index d9cc607..c09afee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -92,7 +92,7 @@ public class ProjectTest implements Serializable {
 
 		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
 
-		env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
+		env.generateParallelSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
 				@Override
 				public Tuple3<Long, Character, Double> map(Long value) throws Exception {
 					return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index 0bb1848..90cb7b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -157,7 +157,7 @@ public class StreamVertexTest {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
 
 		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
-		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
+		DataStream<Long> generatedSequence = env.generateParallelSequence(0, 3);
 
 		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());