You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:00 UTC

[23/51] [abbrv] [streaming] Wrapped serializers to make component construction simpler

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 5532626..911b550 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -29,7 +29,8 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.StringUtils;
 
-public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
+public class StreamIterationSink<IN extends Tuple> extends
+		SingleInputAbstractStreamComponent<IN, IN> {
 
 	private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
 
@@ -51,13 +52,13 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
 
 			iterationId = configuration.getIterationId();
 			dataChannel = BlockingQueueBroker.instance().get(iterationId);
-			
+
 		} catch (Exception e) {
 			throw new StreamComponentException(String.format(
 					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
 		}
 	}
-	
+
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
@@ -93,6 +94,5 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
 
 	@Override
 	protected void setInvokable() {
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index d020058..cf3d47e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -32,7 +32,8 @@ import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
-public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
+public class StreamIterationSource<OUT extends Tuple> extends
+		SingleInputAbstractStreamComponent<Tuple, OUT> {
 
 	private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
 
@@ -68,7 +69,7 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
 		}
 
 	}
-	
+
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
@@ -100,5 +101,4 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
 	@Override
 	protected void setInvokable() {
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 7ac117e..0ead3c6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -28,10 +28,10 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
 
 	private static final Log LOG = LogFactory.getLog(StreamSink.class);
 
-	private StreamRecordInvokable<IN, IN> userFunction;
+	private StreamRecordInvokable<IN, IN> userInvokable;
 
 	public StreamSink() {
-		userFunction = null;
+		userInvokable = null;
 	}
 
 	@Override
@@ -39,18 +39,18 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
 		try {
 			setConfigInputs();
 			setSinkSerializer();
-			
+
 			inputIter = createInputIterator(inputs, inTupleSerializer);
 		} catch (Exception e) {
 			throw new StreamComponentException("Cannot register inputs for "
 					+ getClass().getSimpleName(), e);
 		}
 	}
-	
+
 	@Override
-	protected void setInvokable() {		
-		userFunction = getInvokable();
-		userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
+	protected void setInvokable() {
+		userInvokable = getInvokable();
+		userInvokable.initialize(collector, inputIter, inTupleSerializer, isMutable);
 	}
 
 	@Override
@@ -59,7 +59,7 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
 			LOG.debug("SINK " + name + " invoked");
 		}
 
-		userFunction.invoke();
+		userInvokable.invoke();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("SINK " + name + " invoke finished");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 856c917..70b8242 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
@@ -35,13 +35,13 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 	private static final Log LOG = LogFactory.getLog(StreamSource.class);
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
-	private UserSourceInvokable<OUT> userFunction;
+	private SourceInvokable<OUT> userInvokable;
 	private static int numSources;
 
 	public StreamSource() {
 
 		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-		userFunction = null;
+		userInvokable = null;
 		numSources = newComponent();
 		instanceID = numSources;
 	}
@@ -53,13 +53,14 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 		} catch (StreamComponentException e) {
 			throw new StreamComponentException("Cannot register outputs for "
 					+ getClass().getSimpleName(), e);
-		}		
+		}
 	}
-	
+
 	@Override
 	protected void setInvokable() {
-		// Default value is a TaskInvokable even if it was called from a source
-		userFunction = getInvokable();
+		userInvokable = getInvokable();
+		// setCollector();
+		userInvokable.setCollector(collector);
 	}
 
 	@Override
@@ -72,7 +73,7 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 			output.initializeSerializers();
 		}
 
-		userFunction.invoke(collector);
+		userInvokable.invoke();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("SOURCE " + name + " invoke finished with instance id " + instanceID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 19b1c4b..7cb1d71 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -36,29 +36,28 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 	private static final Log LOG = LogFactory.getLog(StreamTask.class);
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
-	private StreamRecordInvokable<IN, OUT> userFunction;
+	private StreamRecordInvokable<IN, OUT> userInvokable;
 	private static int numTasks;
 
 	public StreamTask() {
 		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-		userFunction = null;
+		userInvokable = null;
 		numTasks = newComponent();
 		instanceID = numTasks;
 	}
-	
+
 	@Override
 	public void setInputsOutputs() {
 		setConfigInputs();
 		setConfigOutputs(outputs);
 
-		inputIter = createInputIterator(inputs, inTupleSerializer);		
+		inputIter = createInputIterator(inputs, inTupleSerializer);
 	}
-	
+
 	@Override
 	protected void setInvokable() {
-		// Default value is a TaskInvokable even if it was called from a source
-		userFunction = getInvokable();
-		userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
+		userInvokable = getInvokable();
+		userInvokable.initialize(collector, inputIter, inTupleSerializer, isMutable);
 	}
 
 	@Override
@@ -71,7 +70,7 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 			output.initializeSerializers();
 		}
 
-		userFunction.invoke();
+		userInvokable.invoke();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
new file mode 100644
index 0000000..c868e4f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class FunctionTypeWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+		TypeSerializerWrapper<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private AbstractFunction function;
+	private Class<? extends AbstractFunction> functionSuperClass;
+	private int inTypeParameter1;
+	private int inTypeParameter2;
+	private int outTypeParameter;
+
+	public FunctionTypeWrapper(AbstractFunction function,
+			Class<? extends AbstractFunction> functionSuperClass, int inTypeParameter1,
+			int inTypeParameter2, int outTypeParameter) {
+		this.function = function;
+		this.functionSuperClass = functionSuperClass;
+		this.inTypeParameter1 = inTypeParameter1;
+		this.inTypeParameter2 = inTypeParameter2;
+		this.outTypeParameter = outTypeParameter;
+		setTupleTypeInfo();
+	}
+
+	private void readObject(java.io.ObjectInputStream in) throws IOException,
+			ClassNotFoundException {
+		in.defaultReadObject();
+		setTupleTypeInfo();
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Override
+	protected void setTupleTypeInfo() {
+		if (inTypeParameter1 != -1) {
+			inTupleTypeInfo1 = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+					function.getClass(), inTypeParameter1, null, null);
+		}
+
+		if (inTypeParameter2 != -1) {
+			inTupleTypeInfo2 = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+					function.getClass(), inTypeParameter2, null, null);
+		}
+
+		if (outTypeParameter != -1) {
+			outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+					function.getClass(), outTypeParameter, null, null);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
new file mode 100644
index 0000000..5ed2312
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class ObjectTypeWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+		TypeSerializerWrapper<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private Object inInstance1;
+	private Object inInstance2;
+	private Object outInstance;
+
+	public ObjectTypeWrapper(Object inInstance1, Object inInstance2, Object outInstance) {
+		this.inInstance1 = inInstance1;
+		this.inInstance2 = inInstance2;
+		this.outInstance = outInstance;
+		setTupleTypeInfo();
+	}
+
+	private void readObject(java.io.ObjectInputStream in) throws IOException,
+			ClassNotFoundException {
+		in.defaultReadObject();
+		setTupleTypeInfo();
+	}
+
+	@Override
+	protected void setTupleTypeInfo() {
+		if (inInstance1 != null) {
+			inTupleTypeInfo1 = new TupleTypeInfo<IN1>(TypeExtractor.getForObject(inInstance1));
+		}
+		if (inInstance2 != null) {
+			inTupleTypeInfo2 = new TupleTypeInfo<IN2>(TypeExtractor.getForObject(inInstance2));
+		}
+		if (outInstance != null) {
+			outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(outInstance));
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
new file mode 100644
index 0000000..473ce7c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+public abstract class TypeSerializerWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple>
+		implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	protected transient TupleTypeInfo<IN1> inTupleTypeInfo1 = null;
+	protected transient TupleTypeInfo<IN2> inTupleTypeInfo2 = null;
+	protected transient TupleTypeInfo<OUT> outTupleTypeInfo = null;
+
+	public TupleTypeInfo<IN1> getInputTupleTypeInfo1() {
+		if (inTupleTypeInfo1 == null) {
+			throw new RuntimeException("There is no TypeInfo for the first input");
+		}
+		return inTupleTypeInfo1;
+	}
+
+	public TupleTypeInfo<IN2> getInputTupleTypeInfo2() {
+		if (inTupleTypeInfo1 == null) {
+			throw new RuntimeException("There is no TypeInfo for the first input");
+		}
+		return inTupleTypeInfo2;
+	}
+
+	public TupleTypeInfo<OUT> getOutputTupleTypeInfo() {
+		if (inTupleTypeInfo1 == null) {
+			throw new RuntimeException("There is no TypeInfo for the first input");
+		}
+		return outTupleTypeInfo;
+	}
+
+	protected abstract void setTupleTypeInfo();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index fc0efcf..0f22262 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.streaming.api;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -30,7 +30,9 @@ import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.util.Collector;
+import org.apache.log4j.Level;
 import org.junit.Test;
 
 public class WriteAsTextTest {
@@ -110,9 +112,11 @@ public class WriteAsTextTest {
 
 	@Test
 	public void test() throws Exception {
-
+		
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+		
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsText(
 				"test1.txt");
@@ -151,11 +155,11 @@ public class WriteAsTextTest {
 		readFile("test4.txt", result4);
 		readFile("test5.txt", result5);
 
-		assertTrue(expected1.equals(result1));
-		assertTrue(expected2.equals(result2));
-		assertTrue(expected3.equals(result3));
-		assertTrue(expected4.equals(result4));
-		assertTrue(expected5.equals(result5));
+		assertEquals(expected1,result1);
+		assertEquals(expected2,result2);
+		assertEquals(expected3,result3);
+		assertEquals(expected4,result4);
+		assertEquals(expected5,result5);
 
 		new File("test1.txt").delete();
 		new File("test2.txt").delete();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index a18c447..054becc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -60,7 +60,7 @@ public class FilterTest implements Serializable {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
 		
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
+		
 		env.fromElements(1, 2, 3, 4, 5, 6, 7).filter(new MyFilter()).addSink(new SetSink());
 
 		env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
new file mode 100644
index 0000000..dceaf46
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.junit.Test;
+
+public class TypeSerializationTest {
+
+	private static class MyMap extends MapFunction<Tuple1<Integer>, Tuple1<String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple1<String> map(Tuple1<Integer> value) throws Exception {
+			return null;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void functionTypeSerializationTest() {
+		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser = new FunctionTypeWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>(
+				new MyMap(), MapFunction.class, 0, -1, 1);
+
+		byte[] serializedType = SerializationUtils.serialize(ser);
+
+		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 = (TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
+				.deserialize(serializedType);
+
+		assertNotNull(ser.getInputTupleTypeInfo1());
+		assertNotNull(ser2.getInputTupleTypeInfo1());
+		
+		assertNotNull(ser.getOutputTupleTypeInfo());
+		assertNotNull(ser2.getOutputTupleTypeInfo());
+		
+		assertEquals(ser.getInputTupleTypeInfo1(), ser2.getInputTupleTypeInfo1());
+		assertEquals(ser.getInputTupleTypeInfo2(), ser2.getInputTupleTypeInfo2());
+		assertEquals(ser.getOutputTupleTypeInfo(), ser2.getOutputTupleTypeInfo());
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void objectTypeSerializationTest() {
+		Integer instance1 = new Integer(22);
+		Integer instance2 = null;
+		Integer instance3 = new Integer(34);
+
+		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser = new ObjectTypeWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>(
+				instance1, instance2, instance3);
+
+		System.out.println(ser.getInputTupleTypeInfo1());
+
+		byte[] serializedType = SerializationUtils.serialize(ser);
+
+		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 = (TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
+				.deserialize(serializedType);
+
+		assertNotNull(ser.getInputTupleTypeInfo1());
+		assertNotNull(ser2.getInputTupleTypeInfo1());
+
+		assertNotNull(ser.getOutputTupleTypeInfo());
+		assertNotNull(ser2.getOutputTupleTypeInfo());
+		
+		assertEquals(ser.getInputTupleTypeInfo1(), ser2.getInputTupleTypeInfo1());
+		assertEquals(ser.getInputTupleTypeInfo2(), ser2.getInputTupleTypeInfo2());
+		assertEquals(ser.getOutputTupleTypeInfo(), ser2.getOutputTupleTypeInfo());
+	}
+}