You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:04:01 UTC

[28/28] git commit: [streaming] Added MockCoInvokable, replaced environments with it in CoFunction tests

[streaming] Added MockCoInvokable, replaced environments with it in CoFunction tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4207c5fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4207c5fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4207c5fb

Branch: refs/heads/master
Commit: 4207c5fb578555f093be7ad00afb8cc7dbcee8f5
Parents: 82aa005
Author: ghermann <re...@gmail.com>
Authored: Tue Aug 26 15:36:25 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../operator/co/CoFlatMapInvokable.java         |  12 ++
 .../operator/co/CoGroupReduceInvokable.java     |  12 ++
 .../invokable/operator/co/CoMapInvokable.java   |  12 ++
 .../operator/co/CoReduceInvokable.java          |  12 ++
 .../flink/streaming/io/CoReaderIterator.java    |   6 +-
 .../flink/streaming/util/MockCollector.java     |  39 -----
 .../flink/streaming/util/MockInvokable.java     | 104 ------------
 .../apache/flink/streaming/util/MockSource.java |  36 ----
 .../api/invokable/operator/CoFlatMapTest.java   |  59 ++-----
 .../invokable/operator/CoGroupReduceTest.java   | 122 ++++----------
 .../api/invokable/operator/CoMapTest.java       |  71 ++------
 .../flink/streaming/util/MockCoInvokable.java   | 168 +++++++++++++++++++
 .../flink/streaming/util/MockCollector.java     |  39 +++++
 .../flink/streaming/util/MockInvokable.java     | 104 ++++++++++++
 .../apache/flink/streaming/util/MockSource.java |  36 ++++
 15 files changed, 466 insertions(+), 366 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
index 2d5262d..bde9368 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
@@ -55,4 +55,16 @@ public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT
 		}
 	}
 
+	@Override
+	protected void coUSerFunction1() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	protected void coUserFunction2() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
index fb1d5ec..e5f4059 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
@@ -70,4 +70,16 @@ public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
 		}
 	}
 
+	@Override
+	protected void coUSerFunction1() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	protected void coUserFunction2() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index 85f3ca6..6179d2e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -55,4 +55,16 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
 		}
 	}
 
+	@Override
+	protected void coUSerFunction1() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	protected void coUserFunction2() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
index c9a313f..4a4a280 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
@@ -55,4 +55,16 @@ public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
 		}
 	}
 
+	@Override
+	protected void coUSerFunction1() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	protected void coUserFunction2() throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
index 729f75d..a6be8fc 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
@@ -26,13 +26,13 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
  * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
  * input types.
  */
-public final class CoReaderIterator<T1, T2> {
+public class CoReaderIterator<T1, T2> {
 
 	private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
 																									// source
 
-	private final DeserializationDelegate<T1> delegate1;
-	private final DeserializationDelegate<T2> delegate2;
+	protected final DeserializationDelegate<T1> delegate1;
+	protected final DeserializationDelegate<T2> delegate2;
 
 	public CoReaderIterator(
 			CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
deleted file mode 100644
index 984fb6d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.Collection;
-
-import org.apache.flink.util.Collector;
-
-public class MockCollector<T> implements Collector<T> {
-	private Collection<T> outputs;
-	
-	public MockCollector(Collection<T> outputs) {
-		this.outputs = outputs;
-	}
-
-	@Override
-	public void collect(T record) {
-		outputs.add(record);
-	}
-
-	@Override
-	public void close() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
deleted file mode 100644
index 1ea78e1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.types.TypeInformation;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class MockInvokable<IN, OUT> {
-	private Collection<IN> inputs;
-	private List<OUT> outputs;
-
-	private Collector<OUT> collector;
-	private StreamRecordSerializer<IN> inDeserializer;
-	private MutableObjectIterator<StreamRecord<IN>> iterator;
-
-	public MockInvokable(Collection<IN> inputs) {
-		this.inputs = inputs;
-		if (inputs.isEmpty()) {
-			throw new RuntimeException("Inputs must not be empty");
-		}
-
-		TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
-		inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
-		
-		iterator = new MockInputIterator();
-		outputs = new ArrayList<OUT>();
-		collector = new MockCollector<OUT>(outputs);
-	}
-
-
-	private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> {
-		Iterator<IN> listIterator;
-		
-		public MockInputIterator() {
-			listIterator = inputs.iterator();
-		}
-
-		@Override
-		public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
-			if (listIterator.hasNext()) {
-				reuse.setObject(listIterator.next());
-			} else {
-				reuse = null;
-			}
-			return reuse;
-		}
-	}
-
-	public List<OUT> getOutputs() {
-		return outputs;
-	}
-
-	public Collector<OUT> getCollector() {
-		return collector;
-	}
-
-	public StreamRecordSerializer<IN> getInDeserializer() {
-		return inDeserializer;
-	}
-
-	public MutableObjectIterator<StreamRecord<IN>> getIterator() {
-		return iterator;
-	}
-
-	public static <IN, OUT> List<OUT> createAndExecute(UserTaskInvokable<IN, OUT> invokable, List<IN> inputs) {
-		MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
-		invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
-		try {
-			invokable.open(null);
-			invokable.invoke();
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke invokable.", e);
-		}
-		
-		return mock.getOutputs();
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
deleted file mode 100644
index 92b9e42..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-
-public class MockSource<T> {
-
-	public static <T> List<T> createAndExecute(SourceFunction<T> source) {
-		List<T> outputs = new ArrayList<T>();
-		try {
-			source.invoke(new MockCollector<T>(outputs));
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke source.", e);
-		}
-		return outputs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index e713f88..b31b39c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -17,37 +17,27 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
 import org.apache.flink.streaming.util.LogUtils;
+import org.apache.flink.streaming.util.MockCoInvokable;
 import org.apache.flink.util.Collector;
 import org.apache.log4j.Level;
-import org.junit.Assert;
 import org.junit.Test;
 
 public class CoFlatMapTest implements Serializable {
 	private static final long serialVersionUID = 1L;
 
-	private static Set<String> result;
-	private static Set<String> expected = new HashSet<String>();
-
-	private final static class EmptySink implements SinkFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(String tuple) {
-		}
-	}
-
 	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
 		private static final long serialVersionUID = 1L;
 
@@ -55,37 +45,32 @@ public class CoFlatMapTest implements Serializable {
 		public void flatMap1(String value, Collector<String> coll) {
 			for (int i = 0; i < value.length(); i++) {
 				coll.collect(value.substring(i, i + 1));
-				result.add(value.substring(i, i + 1));
 			}
 		}
 
 		@Override
 		public void flatMap2(Integer value, Collector<String> coll) {
 			coll.collect(value.toString());
-			result.add(value.toString());
 		}
 	}
 
+	@Test
+	public void coFlatMapTest() {
+		CoFlatMapInvokable<String, Integer, String> invokable = new CoFlatMapInvokable<String, Integer, String>(
+				new MyCoFlatMap());
+
+		List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
+				"e", "3", "4", "5");
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+				Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
+
+		assertEquals(expectedList, actualList);
+	}
+
 	@SuppressWarnings("unchecked")
 	@Test
 	public void multipleInputTest() {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-		expected.add("a");
-		expected.add("b");
-		expected.add("c");
-		expected.add("d");
-		expected.add("e");
-		expected.add("f");
-		expected.add("g");
-		expected.add("h");
-		expected.add("e");
-		expected.add("1");
-		expected.add("2");
-		expected.add("3");
-		expected.add("4");
-		expected.add("5");
-
-		result = new HashSet<String>();
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
@@ -98,13 +83,5 @@ public class CoFlatMapTest implements Serializable {
 		} catch (RuntimeException e) {
 			// good
 		}
-
-		@SuppressWarnings({ "unused" })
-		DataStream<String> ds4 = env.fromElements("abc", "def", "ghe").connect(ds2)
-				.flatMap(new MyCoFlatMap()).addSink(new EmptySink());
-
-		env.executeTest(32);
-
-		Assert.assertEquals(expected, result);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
index 1c91cc0..e1d10b8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
@@ -17,34 +17,20 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import java.util.ArrayList;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
-import org.junit.Assert;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
 import org.junit.Test;
 
 public class CoGroupReduceTest {
-
-	private static List<String> result;
-	private static List<String> expected = new ArrayList<String>();
-
-	private final static class EmptySink implements SinkFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(String tuple) {
-		}
-	}
-
+	
 	private final static class MyCoReduceFunction implements
 			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
 		private static final long serialVersionUID = 1L;
@@ -63,65 +49,18 @@ public class CoGroupReduceTest {
 
 		@Override
 		public String map1(Tuple3<String, String, String> value) {
-			String mapResult = value.f1;
-			result.add(mapResult);
-			return mapResult;
+			return value.f1;
 		}
 
 		@Override
 		public String map2(Tuple2<Integer, Integer> value) {
-			String mapResult = value.f1.toString();
-			result.add(mapResult);
-			return mapResult;
+			return value.f1.toString();
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	@Test
-	public void multipleInputTest() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-		expected.add("word1word3");
-		expected.add("word2");
-		expected.add("3");
-		expected.add("5");
-		expected.add("7");
-		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
-		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
-		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
-		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
-		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
-		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
-		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
-		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
-		result = new ArrayList<String>();
-
-		LocalStreamEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		@SuppressWarnings("unchecked")
-		DataStream<Tuple2<Integer, Integer>> ds1 = env1.fromElements(int1, int3, int5);
-		@SuppressWarnings("unchecked")
-		DataStream<Tuple2<Integer, Integer>> ds2 = env1.fromElements(int2, int4).merge(ds1);
-
-		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<String> ds4 = env1.fromElements(word1, word2, word3).connect(ds2).groupBy(0, 0)
-				.reduce(new MyCoReduceFunction()).addSink(new EmptySink());
-
-		env1.executeTest(32);
-
-		Assert.assertEquals(result.size(), 8);
-		Assert.assertTrue(result.containsAll(expected));
-	}
-
-	@Test
-	public void multipleInputTest2() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-		expected.clear();
-		result.clear();
-		expected.add("word2word3");
-		expected.add("word1");
-		expected.add("3");
-		expected.add("5");
-		expected.add("7");
+	public void coFlatMapTest() {
 		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
 		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
 		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
@@ -130,23 +69,28 @@ public class CoGroupReduceTest {
 		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
 		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
 		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
-		result = new ArrayList<String>();
-
-		LocalStreamEnvironment env2 = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		@SuppressWarnings("unchecked")
-		DataStream<Tuple2<Integer, Integer>> ds1 = env2.fromElements(int1, int3, int5);
-		@SuppressWarnings("unchecked")
-		DataStream<Tuple2<Integer, Integer>> ds2 = env2.fromElements(int2, int4).merge(ds1);
-
-		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<String> ds4 = env2.fromElements(word1, word2, word3).connect(ds2).groupBy(2, 0)
-				.reduce(new MyCoReduceFunction()).addSink(new EmptySink());
-
-		env2.executeTest(32);
-
-		Assert.assertEquals(result.size(), 8);
-		Assert.assertTrue(result.containsAll(expected));
+		
+		CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+				new MyCoReduceFunction(), 0, 0);
+
+		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
+				"7");
+
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+
+		assertEquals(expected, actualList);
+		
+	
+		invokable = new CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+				new MyCoReduceFunction(), 2, 0);
+
+		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5",
+				"7");
+		
+		actualList = MockCoInvokable.createAndExecute(invokable,
+				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+		
+		assertEquals(expected, actualList);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index e24362e..c47d086 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -17,78 +17,41 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.List;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.util.LogUtils;
-
-import org.junit.Assert;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
 import org.junit.Test;
 
 public class CoMapTest implements Serializable {
 	private static final long serialVersionUID = 1L;
 
-	private static Set<String> result;
-	private static Set<String> expected = new HashSet<String>();
-
-	private final static class EmptySink implements SinkFunction<Boolean> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Boolean tuple) {
-		}
-	}
-
-	private final static class MyCoMap implements CoMapFunction<String, Integer, Boolean> {
+	private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Boolean map1(String value) {
-			result.add(value);
-			return true;
+		public String map1(Double value) {
+			return value.toString();
 		}
 
 		@Override
-		public Boolean map2(Integer value) {
-			result.add(value.toString());
-			return false;
+		public String map2(Integer value) {
+			return value.toString();
 		}
 	}
 
 	@Test
-	public void multipleInputTest() {
-		
-		LogUtils.initializeDefaultTestConsoleLogger();
-		
-		expected.add("a");
-		expected.add("b");
-		expected.add("c");
-		expected.add("1");
-		expected.add("2");
-		expected.add("3");
-		expected.add("4");
-
-		result = new HashSet<String>();
-
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+	public void coMapTest() {
+		CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap());
 
-		DataStream<Integer> ds1 = env.fromElements(1, 3);
-		@SuppressWarnings("unchecked")
-		DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
-
-		DataStream<String> ds3 = env.fromElements("a", "b");
-
-		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<Boolean> ds4 = env.fromElements("c").merge(ds3).connect(ds2).map(new MyCoMap())
-				.addSink(new EmptySink());
-
-		env.executeTest(32);
-		Assert.assertEquals(expected, result);
+		List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
+		
+		assertEquals(expectedList, actualList);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
new file mode 100644
index 0000000..2215fcd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.Collector;
+
+public class MockCoInvokable<IN1, IN2, OUT> {
+	// private Collection<IN1> input1;
+	// private Collection<IN2> input2;
+	private Iterator<IN1> inputIterator1;
+	private Iterator<IN2> inputIterator2;
+	private List<OUT> outputs;
+
+	private Collector<OUT> collector;
+	private StreamRecordSerializer<IN1> inDeserializer1;
+	private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator;
+	private StreamRecordSerializer<IN2> inDeserializer2;
+
+	public MockCoInvokable(Collection<IN1> input1, Collection<IN2> input2) {
+
+		if (input1.isEmpty() || input2.isEmpty()) {
+			throw new RuntimeException("Inputs must not be empty");
+		}
+
+		this.inputIterator1 = input1.iterator();
+		this.inputIterator2 = input2.iterator();
+
+		TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next());
+		inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
+		TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next());
+		inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
+
+		mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2);
+
+		outputs = new ArrayList<OUT>();
+		collector = new MockCollector<OUT>(outputs);
+	}
+
+	private int currentInput = 1;
+	private StreamRecord<IN1> reuse1;
+	private StreamRecord<IN2> reuse2;
+	
+	private class MockCoReaderIterator extends
+			CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
+
+		public MockCoReaderIterator(
+				TypeSerializer<StreamRecord<IN1>> serializer1,
+				TypeSerializer<StreamRecord<IN2>> serializer2) {
+			super(null, serializer1, serializer2);
+			reuse1 = inDeserializer1.createInstance();
+			reuse2 = inDeserializer2.createInstance();
+		}
+
+		@Override
+		public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException {
+			this.delegate1.setInstance(target1);
+			this.delegate2.setInstance(target2);
+			
+			int inputNumber = nextRecord();
+			target1.setObject(reuse1.getObject());
+			target2.setObject(reuse2.getObject());
+			
+			return inputNumber;
+		}
+	}
+
+	private Integer nextRecord() {
+		if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
+			switch (currentInput) {
+			case 1:
+				return next1();
+			case 2:
+				return next2();
+			default:
+				return 0;
+			}
+		}
+
+		if (inputIterator1.hasNext()) {
+			return next1();
+		}
+
+		if (inputIterator2.hasNext()) {
+			return next2();
+		}
+
+		return 0;
+	}
+
+	private int next1() {
+		reuse1 = inDeserializer1.createInstance();
+		reuse1.setObject(inputIterator1.next());
+		currentInput = 2;
+		return 1;
+	}
+
+	private int next2() {
+		reuse2 = inDeserializer2.createInstance();
+		reuse2.setObject(inputIterator2.next());
+		currentInput = 1;
+		return 2;
+	}
+
+	public List<OUT> getOutputs() {
+		return outputs;
+	}
+
+	public Collector<OUT> getCollector() {
+		return collector;
+	}
+
+	public StreamRecordSerializer<IN1> getInDeserializer1() {
+		return inDeserializer1;
+	}
+
+	public StreamRecordSerializer<IN2> getInDeserializer2() {
+		return inDeserializer2;
+	}
+
+	public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() {
+		return mockIterator;
+	}
+
+	public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
+			List<IN1> input1, List<IN2> input2) {
+		MockCoInvokable<IN1, IN2, OUT> mock = new MockCoInvokable<IN1, IN2, OUT>(input1, input2);
+		invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer1(),
+				mock.getInDeserializer2(), false);
+
+		try {
+			invokable.open(null);
+			invokable.invoke();
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot invoke invokable.", e);
+		}
+
+		return mock.getOutputs();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
new file mode 100644
index 0000000..984fb6d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class MockCollector<T> implements Collector<T> {
+	private Collection<T> outputs;
+	
+	public MockCollector(Collection<T> outputs) {
+		this.outputs = outputs;
+	}
+
+	@Override
+	public void collect(T record) {
+		outputs.add(record);
+	}
+
+	@Override
+	public void close() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
new file mode 100644
index 0000000..1ea78e1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockInvokable<IN, OUT> {
+	private Collection<IN> inputs;
+	private List<OUT> outputs;
+
+	private Collector<OUT> collector;
+	private StreamRecordSerializer<IN> inDeserializer;
+	private MutableObjectIterator<StreamRecord<IN>> iterator;
+
+	public MockInvokable(Collection<IN> inputs) {
+		this.inputs = inputs;
+		if (inputs.isEmpty()) {
+			throw new RuntimeException("Inputs must not be empty");
+		}
+
+		TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
+		inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
+		
+		iterator = new MockInputIterator();
+		outputs = new ArrayList<OUT>();
+		collector = new MockCollector<OUT>(outputs);
+	}
+
+
+	private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> {
+		Iterator<IN> listIterator;
+		
+		public MockInputIterator() {
+			listIterator = inputs.iterator();
+		}
+
+		@Override
+		public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
+			if (listIterator.hasNext()) {
+				reuse.setObject(listIterator.next());
+			} else {
+				reuse = null;
+			}
+			return reuse;
+		}
+	}
+
+	public List<OUT> getOutputs() {
+		return outputs;
+	}
+
+	public Collector<OUT> getCollector() {
+		return collector;
+	}
+
+	public StreamRecordSerializer<IN> getInDeserializer() {
+		return inDeserializer;
+	}
+
+	public MutableObjectIterator<StreamRecord<IN>> getIterator() {
+		return iterator;
+	}
+
+	public static <IN, OUT> List<OUT> createAndExecute(UserTaskInvokable<IN, OUT> invokable, List<IN> inputs) {
+		MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
+		invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
+		try {
+			invokable.open(null);
+			invokable.invoke();
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot invoke invokable.", e);
+		}
+		
+		return mock.getOutputs();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
new file mode 100644
index 0000000..92b9e42
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+
+public class MockSource<T> {
+
+	public static <T> List<T> createAndExecute(SourceFunction<T> source) {
+		List<T> outputs = new ArrayList<T>();
+		try {
+			source.invoke(new MockCollector<T>(outputs));
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot invoke source.", e);
+		}
+		return outputs;
+	}
+}