You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/10 15:30:45 UTC

[4/5] incubator-flink git commit: [FLINK-1161] [streaming] Streaming API type handling rework to support java 8 lambdas

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 43b3993..b0ab99f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -23,9 +23,11 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
@@ -35,13 +37,12 @@ import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
 /**
  * A {@link WindowedDataStream} represents a data stream that has been divided
@@ -225,8 +226,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
-		return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
-				dataStream.outTypeWrapper, dataStream.outTypeWrapper,
+		return dataStream.addFunction("NextGenWindowReduce", reduceFunction, getType(), getType(),
 				getReduceInvokable(reduceFunction));
 	}
 
@@ -245,9 +245,13 @@ public class WindowedDataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
 			GroupReduceFunction<OUT, R> reduceFunction) {
-		return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
-				dataStream.outTypeWrapper, new FunctionTypeWrapper<R>(reduceFunction,
-						GroupReduceFunction.class, 1), getReduceGroupInvokable(reduceFunction));
+
+		TypeInformation<OUT> inType = getType();
+		TypeInformation<R> outType = TypeExtractor
+				.getGroupReduceReturnTypes(reduceFunction, inType);
+
+		return dataStream.addFunction("NextGenWindowReduce", reduceFunction, inType, outType,
+				getReduceGroupInvokable(reduceFunction));
 	}
 
 	/**
@@ -261,7 +265,7 @@ public class WindowedDataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
 		dataStream.checkFieldRange(positionToSum);
 		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
-				dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
+				dataStream.getClassAtPos(positionToSum), getType()));
 	}
 
 	/**
@@ -276,8 +280,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
-				getOutputType()));
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
 	}
 
 	/**
@@ -290,7 +293,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
 		dataStream.checkFieldRange(positionToMin);
-		return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMin, getType(),
 				AggregationType.MIN));
 	}
 
@@ -307,8 +310,8 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MIN, false));
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
+				false));
 	}
 
 	/**
@@ -339,7 +342,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
 		dataStream.checkFieldRange(positionToMinBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(),
 				AggregationType.MINBY, first));
 	}
 
@@ -359,7 +362,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
 				AggregationType.MINBY, first));
 	}
 
@@ -373,7 +376,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
 		dataStream.checkFieldRange(positionToMax);
-		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getType(),
 				AggregationType.MAX));
 	}
 
@@ -390,8 +393,8 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MAX, false));
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
+				false));
 	}
 
 	/**
@@ -422,7 +425,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
 		dataStream.checkFieldRange(positionToMaxBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(),
 				AggregationType.MAXBY, first));
 	}
 
@@ -442,7 +445,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
 				AggregationType.MAXBY, first));
 	}
 
@@ -450,7 +453,7 @@ public class WindowedDataStream<OUT> {
 		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
-				aggregator, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
+				aggregator, getType(), getType(), invokable);
 
 		return returnStream;
 	}
@@ -576,8 +579,8 @@ public class WindowedDataStream<OUT> {
 	 * 
 	 * @return The output type.
 	 */
-	public TypeInformation<OUT> getOutputType() {
-		return dataStream.getOutputType();
+	public TypeInformation<OUT> getType() {
+		return dataStream.getType();
 	}
 
 	protected WindowedDataStream<OUT> copy() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4f1efd1..5c47592 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -24,22 +24,21 @@ import java.util.List;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.function.source.FileSourceFunction;
 import org.apache.flink.streaming.api.function.source.FileStreamFunction;
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
 import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -131,12 +130,14 @@ public abstract class StreamExecutionEnvironment {
 	public long getBufferTimeout() {
 		return this.bufferTimeout;
 	}
-	
+
 	/**
-	 * Sets the default parallelism that will be used for the local execution environment created by
-	 * {@link #createLocalEnvironment()}.
+	 * Sets the default parallelism that will be used for the local execution
+	 * environment created by {@link #createLocalEnvironment()}.
 	 * 
-	 * @param degreeOfParallelism The degree of parallelism to use as the default local parallelism.
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism to use as the default local
+	 *            parallelism.
 	 */
 	public static void setDefaultLocalParallelism(int degreeOfParallelism) {
 		defaultLocalDop = degreeOfParallelism;
@@ -210,14 +211,14 @@ public abstract class StreamExecutionEnvironment {
 					"fromElements needs at least one element as argument");
 		}
 
-		TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data[0]);
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
-				outTypeWrapper);
+				outTypeInfo);
 
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 			jobGraphBuilder.addStreamVertex(returnStream.getId(),
-					new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+					new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
 					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize elements");
@@ -246,16 +247,16 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
-		TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator().next());
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
-				outTypeWrapper);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "collection",
+				outTypeInfo);
 
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 
 			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
-					new FromElementsFunction<OUT>(data)), null, new ObjectTypeWrapper<OUT>(data
-					.iterator().next()), "source", SerializationUtils.serialize(function), 1);
+					new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source",
+					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize collection");
 		}
@@ -271,17 +272,16 @@ public abstract class StreamExecutionEnvironment {
 	 * @param hostname
 	 *            The host name which a server socket bind.
 	 * @param port
-	 * 			  The port number which a server socket bind. A port number of
-	 * 			  0 means that the port number is automatically allocated.
+	 *            The port number which a server socket bind. A port number of 0
+	 *            means that the port number is automatically allocated.
 	 * @param delimiter
-	 * 			  A character which split received strings into records.
+	 *            A character which split received strings into records.
 	 * @return A DataStream, containing the strings received from socket.
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
 		return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
 	}
-	
-	
+
 	/**
 	 * Creates a new DataStream that contains the strings received infinitely
 	 * from socket. Received strings are decoded by the system's default
@@ -290,8 +290,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @param hostname
 	 *            The host name which a server socket bind.
 	 * @param port
-	 * 			  The port number which a server socket bind. A port number of
-	 * 			  0 means that the port number is automatically allocated.
+	 *            The port number which a server socket bind. A port number of 0
+	 *            means that the port number is automatically allocated.
 	 * @return A DataStream, containing the strings received from socket.
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port) {
@@ -324,14 +324,14 @@ public abstract class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
-		TypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
-				SourceFunction.class, 0);
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
-				outTypeWrapper);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
+				function.getClass(), 0, null, null);
+
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
 
 		try {
 			jobGraphBuilder.addStreamVertex(returnStream.getId(),
-					new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+					new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
 					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index f72f66e..4666a85 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -17,24 +17,25 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	transient OUT outTuple;
-	TypeWrapper<OUT> outTypeWrapper;
+	TypeSerializer<OUT> outTypeSerializer;
 	int[] fields;
 	int numFields;
 
-	public ProjectInvokable(int[] fields, TypeWrapper<OUT> outTypeWrapper) {
+	public ProjectInvokable(int[] fields, TypeInformation<OUT> outTypeInformation) {
 		super(null);
 		this.fields = fields;
 		this.numFields = this.fields.length;
-		this.outTypeWrapper = outTypeWrapper;
+		this.outTypeSerializer = outTypeInformation.createSerializer();
 	}
 
 	@Override
@@ -60,6 +61,6 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
-		outTuple = outTypeWrapper.getTypeInfo().createSerializer().createInstance();
+		outTuple = outTypeSerializer.createInstance();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 2464ff2..0058c66 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.streamvertex;
 
 import java.util.ArrayList;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
@@ -52,11 +51,8 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 	}
 
 	private void setDeserializers() {
-		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1(userClassLoader);
-		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
-
-		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2(userClassLoader);
-		inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
+		inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+		inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 9d65a21..090c1a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
@@ -52,7 +51,7 @@ public class InputHandler<IN> {
 
 	@SuppressWarnings("unchecked")
 	protected void setConfigInputs() throws StreamVertexException {
-		setDeserializer();
+		inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
 
 		int numberOfInputs = configuration.getNumberOfInputs();
 		if (numberOfInputs > 0) {
@@ -74,14 +73,6 @@ public class InputHandler<IN> {
 		}
 	}
 
-	private void setDeserializer() {
-		TypeInformation<IN> inTupleTypeInfo = configuration
-				.getTypeInfoIn1(streamVertex.userClassLoader);
-		if (inTupleTypeInfo != null) {
-			inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
-		}
-	}
-
 	private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index cc67d6e..5381e22 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -98,9 +98,8 @@ public class OutputHandler<OUT> {
 	}
 
 	void setSerializers() {
-		outTypeInfo = configuration.getTypeInfoOut1(streamVertex.userClassLoader);
-		if (outTypeInfo != null) {
-			outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
+		outSerializer = configuration.getTypeSerializerOut1(streamVertex.userClassLoader);
+		if (outSerializer != null) {
 			outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
 			outSerializationDelegate.setInstance(outSerializer.createInstance());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
deleted file mode 100644
index 2839535..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class ClassTypeWrapper<T> extends TypeWrapper<T> {
-	private static final long serialVersionUID = 1L;
-
-	private Class<T> clazz;
-
-	public ClassTypeWrapper(Class<T> clazz) {
-		this.clazz = clazz;
-		setTypeInfo();
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		if (clazz != null) {
-			typeInfo = TypeExtractor.getForClass(clazz);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
deleted file mode 100644
index b4cd65d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class CombineTypeWrapper<OUT1, OUT2> extends
-		TypeWrapper<Tuple2<OUT1, OUT2>> {
-
-	private static final long serialVersionUID = 1L;
-	// Info about OUT
-	private TypeWrapper<OUT1> outTypeWrapper1;
-	private TypeWrapper<OUT2> outTypeWrapper2;
-
-	public CombineTypeWrapper(TypeWrapper<OUT1> outTypeWrapper1,
-			TypeWrapper<OUT2> outTypeWrapper2) {
-		this.outTypeWrapper1 = outTypeWrapper1;
-		this.outTypeWrapper2 = outTypeWrapper2;
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		typeInfo = new TupleTypeInfo<Tuple2<OUT1, OUT2>>(
-				outTypeWrapper1.getTypeInfo(), outTypeWrapper2.getTypeInfo());
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
deleted file mode 100644
index 4255912..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class FunctionTypeWrapper<T> extends TypeWrapper<T> {
-	private static final long serialVersionUID = 1L;
-
-	private Function function;
-	private Class<? extends Function> functionSuperClass;
-	private int typeParameterNumber;
-
-	public FunctionTypeWrapper(Function function, Class<? extends Function> functionSuperClass,
-			int typeParameterNumber) {
-		this.function = function;
-		this.functionSuperClass = functionSuperClass;
-		this.typeParameterNumber = typeParameterNumber;
-		setTypeInfo();
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		if (typeParameterNumber != -1) {
-			typeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
-					typeParameterNumber, null, null);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
deleted file mode 100644
index 6bf90c4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class ObjectTypeWrapper<T> extends
-		TypeWrapper<T> {
-	private static final long serialVersionUID = 1L;
-
-	private T instance;
-
-	public ObjectTypeWrapper(T instance) {
-		this.instance = instance;
-		setTypeInfo();
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		if (instance != null) {
-			typeInfo = TypeExtractor.getForObject(instance);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
deleted file mode 100644
index 9e8d4b4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class ProjectTypeWrapper<IN,OUT extends Tuple> extends
-		TypeWrapper<OUT> {
-	private static final long serialVersionUID = 1L;
-
-
-	private TypeWrapper<IN> inType;
-	Class<?>[] givenTypes;
-	int[] fields;
-
-	public ProjectTypeWrapper(TypeWrapper<IN> inType,int[] fields,Class<?>[] givenTypes) {
-		this.inType = inType;
-		this.givenTypes = givenTypes;
-		this.fields = fields;
-		setTypeInfo();
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		TypeInformation<?>[] outTypes = extractFieldTypes();
-		this.typeInfo = new TupleTypeInfo<OUT>(outTypes);
-	}
-	
-	private TypeInformation<?>[] extractFieldTypes() {
-		
-		TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType.getTypeInfo();
-		TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
-				
-		for(int i=0; i<fields.length; i++) {
-			
-			if(inTupleType.getTypeAt(fields[i]).getTypeClass() != givenTypes[i]) {
-				throw new IllegalArgumentException("Given types do not match types of input data set.");
-			}
-				
-			fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
-		}
-		
-		return fieldTypes;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
deleted file mode 100644
index a2e16b6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public abstract class TypeWrapper<T>
-		implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	protected transient TypeInformation<T> typeInfo = null;
-	
-	public TypeInformation<T> getTypeInfo() {
-		if (typeInfo == null) {
-			throw new RuntimeException("There is no TypeInformation in the wrapper");
-		}
-		return typeInfo;
-	}
-
-	protected abstract void setTypeInfo();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
index 5157dcb..288d4ee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.StreamProjection;
 import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.junit.Test;
 
 public class ProjectTest implements Serializable {
@@ -37,17 +37,18 @@ public class ProjectTest implements Serializable {
 	@Test
 	public void test() {
 
-		TypeWrapper<Tuple5<Integer, String, Integer, String, Integer>> inTypeWrapper = new ObjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>>(
-				new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
+				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
+						4));
 
 		int[] fields = new int[] { 4, 4, 3 };
 		Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
 
-		TypeWrapper<Tuple3<Integer, Integer, String>> outTypeWrapper = new ProjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-				inTypeWrapper, fields, classes);
-
+		@SuppressWarnings("unchecked")
 		ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-				fields, outTypeWrapper);
+				fields,
+				(TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection
+						.extractFieldTypes(fields, classes, inType));
 
 		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
 		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
deleted file mode 100644
index 1d2fd2e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.junit.Test;
-
-public class TypeSerializationTest {
-
-	private static class MyMap extends RichMapFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map(Integer value) throws Exception {
-			return null;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void functionTypeSerializationTest() {
-		TypeWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
-				RichMapFunction.class, 0);
-
-		byte[] serializedType = SerializationUtils.serialize(ser);
-
-		TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
-				.deserialize(serializedType);
-
-		assertNotNull(ser.getTypeInfo());
-		assertNotNull(ser2.getTypeInfo());
-
-		assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void objectTypeSerializationTest() {
-		Integer instance = Integer.valueOf(22);
-		
-		TypeWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
-		
-		byte[] serializedType = SerializationUtils.serialize(ser);
-
-		TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
-				.deserialize(serializedType);
-
-		assertNotNull(ser.getTypeInfo());
-		assertNotNull(ser2.getTypeInfo());
-
-		assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
-	}
-}