You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:43 UTC

[10/28] [streaming] Refactored stream components with InputHandler & OutputHandler

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
deleted file mode 100644
index 0c042bc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.types.TypeInformation;
-import org.apache.flink.util.MutableObjectIterator;
-
-public abstract class SingleInputAbstractStreamComponent<IN, OUT> extends
-		AbstractStreamComponent<OUT> {
-
-	protected StreamRecordSerializer<IN> inputSerializer = null;
-	protected MutableObjectIterator<StreamRecord<IN>> inputIter;
-	protected MutableReader<IOReadableWritable> inputs;
-
-	protected void setDeserializers() {
-		if (functionName.equals(SOURCE)) {
-			setSerializer();
-		} else {
-			setDeserializer();
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private void setDeserializer() {
-		TypeInformation<IN> inTupleTypeInfo = (TypeInformation<IN>) typeWrapper
-				.getInputTypeInfo1();
-		inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
-	}
-
-	@SuppressWarnings("unchecked")
-	protected void setSinkSerializer() {
-		try {
-			TypeInformation<IN> inputTypeInfo = (TypeInformation<IN>) typeWrapper
-					.getOutputTypeInfo();
-			inputSerializer = new StreamRecordSerializer<IN>(inputTypeInfo);
-		} catch (RuntimeException e) {
-			// User implemented sink, nothing to do
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	protected void setConfigInputs() throws StreamComponentException {
-		setDeserializers();
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		if (numberOfInputs < 2) {
-
-			inputs = new MutableRecordReader<IOReadableWritable>(this);
-
-		} else {
-			MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
-
-			for (int i = 0; i < numberOfInputs; i++) {
-				recordReaders[i] = new MutableRecordReader<IOReadableWritable>(this);
-			}
-			inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 069e846..715d0cd 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -29,10 +29,12 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.StringUtils;
 
 public class StreamIterationSink<IN extends Tuple> extends
-		SingleInputAbstractStreamComponent<IN, IN> {
+		AbstractStreamComponent {
 
 	private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
 
+	private InputHandler<IN> inputHandler;
+	
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
@@ -45,16 +47,12 @@ public class StreamIterationSink<IN extends Tuple> extends
 	@Override
 	public void setInputsOutputs() {
 		try {
-			setConfigInputs();
-			setSinkSerializer();
-
-			inputIter = createInputIterator(inputs, inputSerializer);
+			inputHandler = new InputHandler<IN>(this);
 
 			iterationId = configuration.getIterationId();
 			iterationWaitTime = configuration.getIterationWaitTime();
 			shouldWait = iterationWaitTime > 0;
 			dataChannel = BlockingQueueBroker.instance().get(iterationId);
-
 		} catch (Exception e) {
 			throw new StreamComponentException(String.format(
 					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
@@ -64,24 +62,24 @@ public class StreamIterationSink<IN extends Tuple> extends
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK " + name + " invoked");
+			LOG.debug("SINK " + getName() + " invoked");
 		}
 
 		forwardRecords();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK " + name + " invoke finished");
+			LOG.debug("SINK " + getName() + " invoke finished");
 		}
 	}
 
 	protected void forwardRecords() throws Exception {
-		StreamRecord<IN> reuse = inputSerializer.createInstance();
-		while ((reuse = inputIter.next(reuse)) != null) {
+		StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
+		while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
 			if (!pushToQueue(reuse)) {
 				break;
 			}
 			// TODO: Fix object reuse for iteration
-			reuse = inputSerializer.createInstance();
+			reuse = inputHandler.getInputSerializer().createInstance();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 8f32dd7..d278342 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -30,11 +30,12 @@ import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
-public class StreamIterationSource<OUT extends Tuple> extends
-		SingleInputAbstractStreamComponent<Tuple, OUT> {
+public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
 
 	private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
 
+	private OutputHandler<OUT> outputHandler;
+
 	private static int numSources;
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
@@ -44,8 +45,6 @@ public class StreamIterationSource<OUT extends Tuple> extends
 
 	@SuppressWarnings("rawtypes")
 	public StreamIterationSource() {
-
-		outputHandler = new OutputHandler();
 		numSources = newComponent();
 		instanceID = numSources;
 		dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
@@ -53,11 +52,7 @@ public class StreamIterationSource<OUT extends Tuple> extends
 
 	@Override
 	public void setInputsOutputs() {
-		try {
-			outputHandler.setConfigOutputs();
-		} catch (StreamComponentException e) {
-			throw new StreamComponentException("Cannot register outputs", e);
-		}
+		outputHandler = new OutputHandler<OUT>(this);
 
 		iterationId = configuration.getIterationId();
 		iterationWaitTime = configuration.getIterationWaitTime();
@@ -68,18 +63,17 @@ public class StreamIterationSource<OUT extends Tuple> extends
 		} catch (Exception e) {
 
 		}
-
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE " + name + " invoked with instance id " + instanceID);
+			LOG.debug("SOURCE " + getName() + " invoked with instance id " + getInstanceID());
 		}
 
 		outputHandler.initializeOutputSerializers();
-		
+
 		StreamRecord<OUT> nextRecord;
 
 		while (true) {
@@ -91,9 +85,10 @@ public class StreamIterationSource<OUT extends Tuple> extends
 			if (nextRecord == null) {
 				break;
 			}
-			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler.getOutputs()) {
-				outSerializationDelegate.setInstance(nextRecord);
-				output.emit(outSerializationDelegate);
+			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler
+					.getOutputs()) {
+				outputHandler.outSerializationDelegate.setInstance(nextRecord);
+				output.emit(outputHandler.outSerializationDelegate);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index df95bda..31dd44f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -23,10 +23,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
 
-public class StreamSink<IN> extends SingleInputAbstractStreamComponent<IN, IN> {
+public class StreamSink<IN> extends AbstractStreamComponent {
 
 	private static final Log LOG = LogFactory.getLog(StreamSink.class);
 
+	private InputHandler<IN> inputHandler;
+	
 	private StreamRecordInvokable<IN, IN> userInvokable;
 
 	public StreamSink() {
@@ -35,35 +37,26 @@ public class StreamSink<IN> extends SingleInputAbstractStreamComponent<IN, IN> {
 
 	@Override
 	public void setInputsOutputs() {
-		try {
-			setConfigInputs();
-			setSinkSerializer();
-
-			inputIter = createInputIterator(inputs, inputSerializer);
-		} catch (Exception e) {
-			throw new StreamComponentException("Cannot register inputs for "
-					+ getClass().getSimpleName(), e);
-		}
+		inputHandler = new InputHandler<IN>(this);
 	}
 
 	@Override
 	protected void setInvokable() {
-		userInvokable = getInvokable();
-		userInvokable.initialize(collector, inputIter, inputSerializer, isMutable);
+		userInvokable = configuration.getUserInvokable();
+		userInvokable.initialize(null, inputHandler.getInputIter(), inputHandler.getInputSerializer(),
+				isMutable);
 	}
 
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK " + name + " invoked");
+			LOG.debug("SINK " + getName() + " invoked");
 		}
 
-		userInvokable.open(getTaskConfiguration());
-		userInvokable.invoke();
-		userInvokable.close();
+		invokeUserFunction(userInvokable);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK " + name + " invoke finished");
+			LOG.debug("SINK " + getName() + " invoke finished");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 494dfc1..6413cb4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -22,37 +22,33 @@ package org.apache.flink.streaming.api.streamcomponent;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
 
-public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
+public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent {
 
-	private SourceInvokable<OUT> userInvokable;
+	protected OutputHandler<OUT> outputHandler;
+
+	private SourceInvokable<OUT> sourceInvokable;
+	
 	private static int numSources;
 
 	public StreamSource() {
-		outputHandler = new OutputHandler();
-		userInvokable = null;
+		sourceInvokable = null;
 		numSources = newComponent();
 		instanceID = numSources;
 	}
 
 	@Override
 	public void setInputsOutputs() {
-		try {
-			outputHandler.setConfigOutputs();
-		} catch (StreamComponentException e) {
-			throw new StreamComponentException("Cannot register outputs for "
-					+ getClass().getSimpleName(), e);
-		}
+		outputHandler = new OutputHandler<OUT>(this);
 	}
 
 	@Override
 	protected void setInvokable() {
-		userInvokable = getInvokable();
-		userInvokable.setCollector(collector);
+		sourceInvokable = configuration.getUserInvokable();
+		sourceInvokable.setCollector(outputHandler.getCollector());
 	}
 
 	@Override
 	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("SOURCE", userInvokable);
+		outputHandler.invokeUserFunction("SOURCE", sourceInvokable);
 	}
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 32c10b6..cb017c4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -22,14 +22,16 @@ package org.apache.flink.streaming.api.streamcomponent;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
 
-public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
-		SingleInputAbstractStreamComponent<IN, OUT> {
+public class StreamTask<IN extends Tuple, OUT extends Tuple> extends AbstractStreamComponent {
+
+	private InputHandler<IN> inputHandler;
+	private OutputHandler<OUT> outputHandler;
 
 	private StreamRecordInvokable<IN, OUT> userInvokable;
+	
 	private static int numTasks;
 
 	public StreamTask() {
-		outputHandler = new OutputHandler();
 		userInvokable = null;
 		numTasks = newComponent();
 		instanceID = numTasks;
@@ -37,17 +39,15 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 
 	@Override
 	public void setInputsOutputs() {
-		setConfigInputs();
-		outputHandler.setConfigOutputs();
-
-		inputIter = createInputIterator(inputs, inputSerializer);
+		inputHandler = new InputHandler<IN>(this);
+		outputHandler = new OutputHandler<OUT>(this);
 	}
 
 	@Override
 	protected void setInvokable() {
-		userInvokable = getInvokable();
-		userInvokable.initialize(collector, inputIter, inputSerializer,
-				isMutable);
+		userInvokable = configuration.getUserInvokable();
+		userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
+				inputHandler.getInputSerializer(), isMutable);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
index 2ac6a47..acde311 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -24,22 +24,18 @@ import java.io.IOException;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-public class FunctionTypeWrapper<IN1, IN2, OUT> extends TypeSerializerWrapper<IN1, IN2, OUT> {
+public class FunctionTypeWrapper<T> extends TypeSerializerWrapper<T> {
 	private static final long serialVersionUID = 1L;
 
 	private Function function;
 	private Class<? extends Function> functionSuperClass;
-	private int inTypeParameter1;
-	private int inTypeParameter2;
-	private int outTypeParameter;
+	private int typeParameterNumber;
 
 	public FunctionTypeWrapper(Function function, Class<? extends Function> functionSuperClass,
-			int inTypeParameter1, int inTypeParameter2, int outTypeParameter) {
+			int typeParameterNumber) {
 		this.function = function;
 		this.functionSuperClass = functionSuperClass;
-		this.inTypeParameter1 = inTypeParameter1;
-		this.inTypeParameter2 = inTypeParameter2;
-		this.outTypeParameter = outTypeParameter;
+		this.typeParameterNumber = typeParameterNumber;
 		setTypeInfo();
 	}
 
@@ -51,19 +47,9 @@ public class FunctionTypeWrapper<IN1, IN2, OUT> extends TypeSerializerWrapper<IN
 
 	@Override
 	protected void setTypeInfo() {
-		if (inTypeParameter1 != -1) {
-			inTypeInfo1 = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
-					inTypeParameter1, null, null);
-		}
-
-		if (inTypeParameter2 != -1) {
-			inTypeInfo2 = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
-					inTypeParameter2, null, null);
-		}
-
-		if (outTypeParameter != -1) {
-			outTypeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
-					outTypeParameter, null, null);
+		if (typeParameterNumber != -1) {
+			typeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
+					typeParameterNumber, null, null);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
index c1bf52c..7e079d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
@@ -23,18 +23,14 @@ import java.io.IOException;
 
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-public class ObjectTypeWrapper<IN1, IN2, OUT> extends
-		TypeSerializerWrapper<IN1, IN2, OUT> {
+public class ObjectTypeWrapper<T> extends
+		TypeSerializerWrapper<T> {
 	private static final long serialVersionUID = 1L;
 
-	private IN1 inInstance1;
-	private IN2 inInstance2;
-	private OUT outInstance;
+	private T instance;
 
-	public ObjectTypeWrapper(IN1 inInstance1, IN2 inInstance2, OUT outInstance) {
-		this.inInstance1 = inInstance1;
-		this.inInstance2 = inInstance2;
-		this.outInstance = outInstance;
+	public ObjectTypeWrapper(T instance) {
+		this.instance = instance;
 		setTypeInfo();
 	}
 
@@ -46,14 +42,8 @@ public class ObjectTypeWrapper<IN1, IN2, OUT> extends
 
 	@Override
 	protected void setTypeInfo() {
-		if (inInstance1 != null) {
-			inTypeInfo1 = TypeExtractor.getForObject(inInstance1);
-		}
-		if (inInstance2 != null) {
-			inTypeInfo2 = TypeExtractor.getForObject(inInstance2);
-		}
-		if (outInstance != null) {
-			outTypeInfo = TypeExtractor.getForObject(outInstance);
+		if (instance != null) {
+			typeInfo = TypeExtractor.getForObject(instance);
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
index 2aa50f2..33e5a94 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
@@ -23,33 +23,17 @@ import java.io.Serializable;
 
 import org.apache.flink.types.TypeInformation;
 
-public abstract class TypeSerializerWrapper<IN1, IN2, OUT>
+public abstract class TypeSerializerWrapper<T>
 		implements Serializable {
 	private static final long serialVersionUID = 1L;
 
-	protected transient TypeInformation<IN1> inTypeInfo1 = null;
-	protected transient TypeInformation<IN2> inTypeInfo2 = null;
-	protected transient TypeInformation<OUT> outTypeInfo = null;
-
-	public TypeInformation<IN1> getInputTypeInfo1() {
-		if (inTypeInfo1 == null) {
-			throw new RuntimeException("There is no TypeInfo for the first input");
-		}
-		return inTypeInfo1;
-	}
-
-	public TypeInformation<IN2> getInputTypeInfo2() {
-		if (inTypeInfo2 == null) {
-			throw new RuntimeException("There is no TypeInfo for the second input");
-		}
-		return inTypeInfo2;
-	}
-
-	public TypeInformation<OUT> getOutputTypeInfo() {
-		if (outTypeInfo == null) {
-			throw new RuntimeException("There is no TypeInfo for the output");
+	protected transient TypeInformation<T> typeInfo = null;
+	
+	public TypeInformation<T> getTypeInfo() {
+		if (typeInfo == null) {
+			throw new RuntimeException("There is no TypeInformation in the wrapper");
 		}
-		return outTypeInfo;
+		return typeInfo;
 	}
 
 	protected abstract void setTypeInfo();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index cdcd993..58ec692 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.LogUtils;
@@ -26,12 +28,30 @@ public class PrintTest{
 
 	private static final long MEMORYSIZE = 32;
 
+	private static final class IdentityMap implements MapFunction<Long, Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long map(Long value) throws Exception {
+			return value;
+		}
+	}
+
+	private static final class FilterAll implements FilterFunction<Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Long value) throws Exception {
+			return true;
+		}
+	}
+	
 	@Test
 	public void test() throws Exception {
 		LogUtils.initializeDefaultTestConsoleLogger();
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		env.generateSequence(1, 10).print();
+		env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
 		env.executeTest(MEMORYSIZE);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
index 7347f62..05a9e76 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -21,22 +21,18 @@ package org.apache.flink.streaming.util.serialization;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.junit.Test;
 
 public class TypeSerializationTest {
 
-	private static class MyMap extends RichMapFunction<Tuple1<Integer>, Tuple1<String>> {
+	private static class MyMap extends RichMapFunction<Integer, String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple1<String> map(Tuple1<Integer> value) throws Exception {
+		public String map(Integer value) throws Exception {
 			return null;
 		}
 	}
@@ -44,60 +40,35 @@ public class TypeSerializationTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void functionTypeSerializationTest() {
-		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser = new FunctionTypeWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>(
-				new MyMap(), RichMapFunction.class, 0, -1, 1);
+		TypeSerializerWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
+				RichMapFunction.class, 0);
 
 		byte[] serializedType = SerializationUtils.serialize(ser);
 
-		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 = (TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
+		TypeSerializerWrapper<Integer> ser2 = (TypeSerializerWrapper<Integer>) SerializationUtils
 				.deserialize(serializedType);
 
-		assertNotNull(ser.getInputTypeInfo1());
-		assertNotNull(ser2.getInputTypeInfo1());
+		assertNotNull(ser.getTypeInfo());
+		assertNotNull(ser2.getTypeInfo());
 
-		assertNotNull(ser.getOutputTypeInfo());
-		assertNotNull(ser2.getOutputTypeInfo());
-
-		assertEquals(ser.getInputTypeInfo1(), ser2.getInputTypeInfo1());
-		try {
-			ser.getInputTypeInfo2();
-			fail();
-		} catch (RuntimeException e) {
-			assertTrue(true);
-		}
-		assertEquals(ser.getOutputTypeInfo(), ser2.getOutputTypeInfo());
+		assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
 	}
 
 	@SuppressWarnings("unchecked")
 	@Test
 	public void objectTypeSerializationTest() {
-		Integer instance1 = new Integer(22);
-		Integer instance2 = null;
-		Integer instance3 = new Integer(34);
-
-		TypeSerializerWrapper<Integer, Integer, Integer> ser = new ObjectTypeWrapper<Integer, Integer, Integer>(
-				instance1, instance2, instance3);
-
-		// System.out.println(ser.getInputTupleTypeInfo1());
-
+		Integer instance = new Integer(22);
+		
+		TypeSerializerWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
+		
 		byte[] serializedType = SerializationUtils.serialize(ser);
 
-		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 = (TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
+		TypeSerializerWrapper<Integer> ser2 = (TypeSerializerWrapper<Integer>) SerializationUtils
 				.deserialize(serializedType);
 
-		assertNotNull(ser.getInputTypeInfo1());
-		assertNotNull(ser2.getInputTypeInfo1());
-
-		assertNotNull(ser.getOutputTypeInfo());
-		assertNotNull(ser2.getOutputTypeInfo());
+		assertNotNull(ser.getTypeInfo());
+		assertNotNull(ser2.getTypeInfo());
 
-		assertEquals(ser.getInputTypeInfo1(), ser2.getInputTypeInfo1());
-		try {
-			ser.getInputTypeInfo2();
-			fail();
-		} catch (RuntimeException e) {
-			assertTrue(true);
-		}
-		assertEquals(ser.getOutputTypeInfo(), ser2.getOutputTypeInfo());
+		assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
 	}
 }