You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:36 UTC

[03/12] [FLINK-1102] [streaming] Projection operator added to DataStream

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c3231d7..e7a68d3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -224,7 +224,7 @@ public abstract class StreamExecutionEnvironment {
 					"fromElements needs at least one element as argument");
 		}
 
-		TypeSerializerWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
+		TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
 				outTypeWrapper);
 
@@ -259,7 +259,7 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
-		TypeSerializerWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator()
+		TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator()
 				.next());
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
 				outTypeWrapper);
@@ -305,7 +305,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
-		TypeSerializerWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
+		TypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
 				SourceFunction.class, 0);
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
 				outTypeWrapper);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index b3cd57f..476c519 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -43,7 +43,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
 
 	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
-	protected StreamRecordSerializer<IN> serializer;
+	protected StreamRecordSerializer<IN> inSerializer;
 	protected StreamRecord<IN> reuse;
 	protected boolean isMutable;
 
@@ -73,8 +73,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 			StreamRecordSerializer<IN> serializer, boolean isMutable) {
 		this.collector = collector;
 		this.recordIterator = recordIterator;
-		this.serializer = serializer;
-		if(this.serializer != null){
+		this.inSerializer = serializer;
+		if(this.inSerializer != null){
 			this.reuse = serializer.createInstance();
 		}
 		this.isMutable = isMutable;
@@ -84,7 +84,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	 * Re-initializes the object in which the next input record will be read in
 	 */
 	protected void resetReuse() {
-		this.reuse = serializer.createInstance();
+		this.reuse = inSerializer.createInstance();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index f306dac..1cfc2d2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -117,7 +117,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
-		this.typeSerializer = serializer.getObjectSerializer();
+		this.typeSerializer = inSerializer.getObjectSerializer();
 	}
 
 	protected class StreamBatch implements Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
new file mode 100644
index 0000000..f72f66e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
+
+public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	transient OUT outTuple;
+	TypeWrapper<OUT> outTypeWrapper;
+	int[] fields;
+	int numFields;
+
+	public ProjectInvokable(int[] fields, TypeWrapper<OUT> outTypeWrapper) {
+		super(null);
+		this.fields = fields;
+		this.numFields = this.fields.length;
+		this.outTypeWrapper = outTypeWrapper;
+	}
+
+	@Override
+	protected void immutableInvoke() throws Exception {
+		mutableInvoke();
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		for (int i = 0; i < this.numFields; i++) {
+			outTuple.setField(reuse.getField(fields[i]), i);
+		}
+		collector.collect(outTuple);
+	}
+
+	@Override
+	public void open(Configuration config) throws Exception {
+		super.open(config);
+		outTuple = outTypeWrapper.getTypeInfo().createSerializer().createInstance();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
index ea1a750..4255912 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-public class FunctionTypeWrapper<T> extends TypeSerializerWrapper<T> {
+public class FunctionTypeWrapper<T> extends TypeWrapper<T> {
 	private static final long serialVersionUID = 1L;
 
 	private Function function;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
index 487c0a3..6bf90c4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 public class ObjectTypeWrapper<T> extends
-		TypeSerializerWrapper<T> {
+		TypeWrapper<T> {
 	private static final long serialVersionUID = 1L;
 
 	private T instance;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
new file mode 100644
index 0000000..9e8d4b4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+public class ProjectTypeWrapper<IN,OUT extends Tuple> extends
+		TypeWrapper<OUT> {
+	private static final long serialVersionUID = 1L;
+
+
+	private TypeWrapper<IN> inType;
+	Class<?>[] givenTypes;
+	int[] fields;
+
+	public ProjectTypeWrapper(TypeWrapper<IN> inType,int[] fields,Class<?>[] givenTypes) {
+		this.inType = inType;
+		this.givenTypes = givenTypes;
+		this.fields = fields;
+		setTypeInfo();
+	}
+
+	private void readObject(java.io.ObjectInputStream in) throws IOException,
+			ClassNotFoundException {
+		in.defaultReadObject();
+		setTypeInfo();
+	}
+
+	@Override
+	protected void setTypeInfo() {
+		TypeInformation<?>[] outTypes = extractFieldTypes();
+		this.typeInfo = new TupleTypeInfo<OUT>(outTypes);
+	}
+	
+	private TypeInformation<?>[] extractFieldTypes() {
+		
+		TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType.getTypeInfo();
+		TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
+				
+		for(int i=0; i<fields.length; i++) {
+			
+			if(inTupleType.getTypeAt(fields[i]).getTypeClass() != givenTypes[i]) {
+				throw new IllegalArgumentException("Given types do not match types of input data set.");
+			}
+				
+			fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
+		}
+		
+		return fieldTypes;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
deleted file mode 100644
index ca7a9d2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public abstract class TypeSerializerWrapper<T>
-		implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	protected transient TypeInformation<T> typeInfo = null;
-	
-	public TypeInformation<T> getTypeInfo() {
-		if (typeInfo == null) {
-			throw new RuntimeException("There is no TypeInformation in the wrapper");
-		}
-		return typeInfo;
-	}
-
-	protected abstract void setTypeInfo();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
new file mode 100644
index 0000000..a2e16b6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public abstract class TypeWrapper<T>
+		implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	protected transient TypeInformation<T> typeInfo = null;
+	
+	public TypeInformation<T> getTypeInfo() {
+		if (typeInfo == null) {
+			throw new RuntimeException("There is no TypeInformation in the wrapper");
+		}
+		return typeInfo;
+	}
+
+	protected abstract void setTypeInfo();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
new file mode 100644
index 0000000..5157dcb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.junit.Test;
+
+public class ProjectTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	@Test
+	public void test() {
+
+		TypeWrapper<Tuple5<Integer, String, Integer, String, Integer>> inTypeWrapper = new ObjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>>(
+				new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+
+		int[] fields = new int[] { 4, 4, 3 };
+		Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
+
+		TypeWrapper<Tuple3<Integer, Integer, String>> outTypeWrapper = new ProjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
+				inTypeWrapper, fields, classes);
+
+		ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
+				fields, outTypeWrapper);
+
+		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7));
+
+		List<Tuple3<Integer, Integer, String>> expected = new ArrayList<Tuple3<Integer, Integer, String>>();
+		expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b"));
+		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+		expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
+
+		assertEquals(expected, MockInvokable.createAndExecute(invokable, input));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index f470c76..0119f04 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -116,6 +116,11 @@ public class StreamVertexTest {
 			fail();
 		} catch (IllegalArgumentException e) {
 		}
+		try {
+			env.generateSequence(1, 10).project(2);
+			fail();
+		} catch (RuntimeException e) {
+		}
 
 		try {
 			env.readTextFile("random/path/that/is/not/valid");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
index c4682dd..e8b96c5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
@@ -17,20 +17,24 @@
 
 package org.apache.flink.streaming.util;
 
+import java.io.Serializable;
 import java.util.Collection;
 
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.util.Collector;
 
 public class MockCollector<T> implements Collector<T> {
 	private Collection<T> outputs;
-	
+
 	public MockCollector(Collection<T> outputs) {
 		this.outputs = outputs;
 	}
 
 	@Override
 	public void collect(T record) {
-		outputs.add(record);
+		T copied = SerializationUtils.deserialize(SerializationUtils
+				.serialize((Serializable) record));
+		outputs.add(copied);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
index d9cde4f..8795126 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -38,12 +38,12 @@ public class TypeSerializationTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void functionTypeSerializationTest() {
-		TypeSerializerWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
+		TypeWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
 				RichMapFunction.class, 0);
 
 		byte[] serializedType = SerializationUtils.serialize(ser);
 
-		TypeSerializerWrapper<Integer> ser2 = (TypeSerializerWrapper<Integer>) SerializationUtils
+		TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
 				.deserialize(serializedType);
 
 		assertNotNull(ser.getTypeInfo());
@@ -57,11 +57,11 @@ public class TypeSerializationTest {
 	public void objectTypeSerializationTest() {
 		Integer instance = new Integer(22);
 		
-		TypeSerializerWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
+		TypeWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
 		
 		byte[] serializedType = SerializationUtils.serialize(ser);
 
-		TypeSerializerWrapper<Integer> ser2 = (TypeSerializerWrapper<Integer>) SerializationUtils
+		TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
 				.deserialize(serializedType);
 
 		assertNotNull(ser.getTypeInfo());