You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:00 UTC
[23/51] [abbrv] [streaming] Wrapped serializers to make component
construction simpler
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 5532626..911b550 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -29,7 +29,8 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
-public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
+public class StreamIterationSink<IN extends Tuple> extends
+ SingleInputAbstractStreamComponent<IN, IN> {
private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
@@ -51,13 +52,13 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
iterationId = configuration.getIterationId();
dataChannel = BlockingQueueBroker.instance().get(iterationId);
-
+
} catch (Exception e) {
throw new StreamComponentException(String.format(
"Cannot register inputs of StreamIterationSink %s", iterationId), e);
}
}
-
+
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
@@ -93,6 +94,5 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
@Override
protected void setInvokable() {
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index d020058..cf3d47e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -32,7 +32,8 @@ import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
+public class StreamIterationSource<OUT extends Tuple> extends
+ SingleInputAbstractStreamComponent<Tuple, OUT> {
private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
@@ -68,7 +69,7 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
}
}
-
+
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
@@ -100,5 +101,4 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
@Override
protected void setInvokable() {
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 7ac117e..0ead3c6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -28,10 +28,10 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
private static final Log LOG = LogFactory.getLog(StreamSink.class);
- private StreamRecordInvokable<IN, IN> userFunction;
+ private StreamRecordInvokable<IN, IN> userInvokable;
public StreamSink() {
- userFunction = null;
+ userInvokable = null;
}
@Override
@@ -39,18 +39,18 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
try {
setConfigInputs();
setSinkSerializer();
-
+
inputIter = createInputIterator(inputs, inTupleSerializer);
} catch (Exception e) {
throw new StreamComponentException("Cannot register inputs for "
+ getClass().getSimpleName(), e);
}
}
-
+
@Override
- protected void setInvokable() {
- userFunction = getInvokable();
- userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
+ protected void setInvokable() {
+ userInvokable = getInvokable();
+ userInvokable.initialize(collector, inputIter, inTupleSerializer, isMutable);
}
@Override
@@ -59,7 +59,7 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
LOG.debug("SINK " + name + " invoked");
}
- userFunction.invoke();
+ userInvokable.invoke();
if (LOG.isDebugEnabled()) {
LOG.debug("SINK " + name + " invoke finished");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 856c917..70b8242 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
@@ -35,13 +35,13 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
private static final Log LOG = LogFactory.getLog(StreamSource.class);
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
- private UserSourceInvokable<OUT> userFunction;
+ private SourceInvokable<OUT> userInvokable;
private static int numSources;
public StreamSource() {
outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
- userFunction = null;
+ userInvokable = null;
numSources = newComponent();
instanceID = numSources;
}
@@ -53,13 +53,14 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
} catch (StreamComponentException e) {
throw new StreamComponentException("Cannot register outputs for "
+ getClass().getSimpleName(), e);
- }
+ }
}
-
+
@Override
protected void setInvokable() {
- // Default value is a TaskInvokable even if it was called from a source
- userFunction = getInvokable();
+ userInvokable = getInvokable();
+ // setCollector();
+ userInvokable.setCollector(collector);
}
@Override
@@ -72,7 +73,7 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
output.initializeSerializers();
}
- userFunction.invoke(collector);
+ userInvokable.invoke();
if (LOG.isDebugEnabled()) {
LOG.debug("SOURCE " + name + " invoke finished with instance id " + instanceID);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 19b1c4b..7cb1d71 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -36,29 +36,28 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
private static final Log LOG = LogFactory.getLog(StreamTask.class);
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
- private StreamRecordInvokable<IN, OUT> userFunction;
+ private StreamRecordInvokable<IN, OUT> userInvokable;
private static int numTasks;
public StreamTask() {
outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
- userFunction = null;
+ userInvokable = null;
numTasks = newComponent();
instanceID = numTasks;
}
-
+
@Override
public void setInputsOutputs() {
setConfigInputs();
setConfigOutputs(outputs);
- inputIter = createInputIterator(inputs, inTupleSerializer);
+ inputIter = createInputIterator(inputs, inTupleSerializer);
}
-
+
@Override
protected void setInvokable() {
- // Default value is a TaskInvokable even if it was called from a source
- userFunction = getInvokable();
- userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
+ userInvokable = getInvokable();
+ userInvokable.initialize(collector, inputIter, inTupleSerializer, isMutable);
}
@Override
@@ -71,7 +70,7 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
output.initializeSerializers();
}
- userFunction.invoke();
+ userInvokable.invoke();
if (LOG.isDebugEnabled()) {
LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
new file mode 100644
index 0000000..c868e4f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class FunctionTypeWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+ TypeSerializerWrapper<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private AbstractFunction function;
+ private Class<? extends AbstractFunction> functionSuperClass;
+ private int inTypeParameter1;
+ private int inTypeParameter2;
+ private int outTypeParameter;
+
+ public FunctionTypeWrapper(AbstractFunction function,
+ Class<? extends AbstractFunction> functionSuperClass, int inTypeParameter1,
+ int inTypeParameter2, int outTypeParameter) {
+ this.function = function;
+ this.functionSuperClass = functionSuperClass;
+ this.inTypeParameter1 = inTypeParameter1;
+ this.inTypeParameter2 = inTypeParameter2;
+ this.outTypeParameter = outTypeParameter;
+ setTupleTypeInfo();
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException,
+ ClassNotFoundException {
+ in.defaultReadObject();
+ setTupleTypeInfo();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ protected void setTupleTypeInfo() {
+ if (inTypeParameter1 != -1) {
+ inTupleTypeInfo1 = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+ function.getClass(), inTypeParameter1, null, null);
+ }
+
+ if (inTypeParameter2 != -1) {
+ inTupleTypeInfo2 = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+ function.getClass(), inTypeParameter2, null, null);
+ }
+
+ if (outTypeParameter != -1) {
+ outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+ function.getClass(), outTypeParameter, null, null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
new file mode 100644
index 0000000..5ed2312
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class ObjectTypeWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+ TypeSerializerWrapper<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private Object inInstance1;
+ private Object inInstance2;
+ private Object outInstance;
+
+ public ObjectTypeWrapper(Object inInstance1, Object inInstance2, Object outInstance) {
+ this.inInstance1 = inInstance1;
+ this.inInstance2 = inInstance2;
+ this.outInstance = outInstance;
+ setTupleTypeInfo();
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException,
+ ClassNotFoundException {
+ in.defaultReadObject();
+ setTupleTypeInfo();
+ }
+
+ @Override
+ protected void setTupleTypeInfo() {
+ if (inInstance1 != null) {
+ inTupleTypeInfo1 = new TupleTypeInfo<IN1>(TypeExtractor.getForObject(inInstance1));
+ }
+ if (inInstance2 != null) {
+ inTupleTypeInfo2 = new TupleTypeInfo<IN2>(TypeExtractor.getForObject(inInstance2));
+ }
+ if (outInstance != null) {
+ outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(outInstance));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
new file mode 100644
index 0000000..473ce7c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+public abstract class TypeSerializerWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple>
+ implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ protected transient TupleTypeInfo<IN1> inTupleTypeInfo1 = null;
+ protected transient TupleTypeInfo<IN2> inTupleTypeInfo2 = null;
+ protected transient TupleTypeInfo<OUT> outTupleTypeInfo = null;
+
+ public TupleTypeInfo<IN1> getInputTupleTypeInfo1() {
+ if (inTupleTypeInfo1 == null) {
+ throw new RuntimeException("There is no TypeInfo for the first input");
+ }
+ return inTupleTypeInfo1;
+ }
+
+ public TupleTypeInfo<IN2> getInputTupleTypeInfo2() {
+ if (inTupleTypeInfo1 == null) {
+ throw new RuntimeException("There is no TypeInfo for the first input");
+ }
+ return inTupleTypeInfo2;
+ }
+
+ public TupleTypeInfo<OUT> getOutputTupleTypeInfo() {
+ if (inTupleTypeInfo1 == null) {
+ throw new RuntimeException("There is no TypeInfo for the first input");
+ }
+ return outTupleTypeInfo;
+ }
+
+ protected abstract void setTupleTypeInfo();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index fc0efcf..0f22262 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import java.io.BufferedReader;
import java.io.File;
@@ -30,7 +30,9 @@ import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.LogUtils;
import org.apache.flink.util.Collector;
+import org.apache.log4j.Level;
import org.junit.Test;
public class WriteAsTextTest {
@@ -110,9 +112,11 @@ public class WriteAsTextTest {
@Test
public void test() throws Exception {
-
+
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsText(
"test1.txt");
@@ -151,11 +155,11 @@ public class WriteAsTextTest {
readFile("test4.txt", result4);
readFile("test5.txt", result5);
- assertTrue(expected1.equals(result1));
- assertTrue(expected2.equals(result2));
- assertTrue(expected3.equals(result3));
- assertTrue(expected4.equals(result4));
- assertTrue(expected5.equals(result5));
+ assertEquals(expected1,result1);
+ assertEquals(expected2,result2);
+ assertEquals(expected3,result3);
+ assertEquals(expected4,result4);
+ assertEquals(expected5,result5);
new File("test1.txt").delete();
new File("test2.txt").delete();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index a18c447..054becc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -60,7 +60,7 @@ public class FilterTest implements Serializable {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
+
env.fromElements(1, 2, 3, 4, 5, 6, 7).filter(new MyFilter()).addSink(new SetSink());
env.execute();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
new file mode 100644
index 0000000..dceaf46
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.junit.Test;
+
+public class TypeSerializationTest {
+
+ private static class MyMap extends MapFunction<Tuple1<Integer>, Tuple1<String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple1<String> map(Tuple1<Integer> value) throws Exception {
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void functionTypeSerializationTest() {
+ TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser = new FunctionTypeWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>(
+ new MyMap(), MapFunction.class, 0, -1, 1);
+
+ byte[] serializedType = SerializationUtils.serialize(ser);
+
+ TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 = (TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
+ .deserialize(serializedType);
+
+ assertNotNull(ser.getInputTupleTypeInfo1());
+ assertNotNull(ser2.getInputTupleTypeInfo1());
+
+ assertNotNull(ser.getOutputTupleTypeInfo());
+ assertNotNull(ser2.getOutputTupleTypeInfo());
+
+ assertEquals(ser.getInputTupleTypeInfo1(), ser2.getInputTupleTypeInfo1());
+ assertEquals(ser.getInputTupleTypeInfo2(), ser2.getInputTupleTypeInfo2());
+ assertEquals(ser.getOutputTupleTypeInfo(), ser2.getOutputTupleTypeInfo());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void objectTypeSerializationTest() {
+ Integer instance1 = new Integer(22);
+ Integer instance2 = null;
+ Integer instance3 = new Integer(34);
+
+ TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser = new ObjectTypeWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>(
+ instance1, instance2, instance3);
+
+ System.out.println(ser.getInputTupleTypeInfo1());
+
+ byte[] serializedType = SerializationUtils.serialize(ser);
+
+ TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 = (TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
+ .deserialize(serializedType);
+
+ assertNotNull(ser.getInputTupleTypeInfo1());
+ assertNotNull(ser2.getInputTupleTypeInfo1());
+
+ assertNotNull(ser.getOutputTupleTypeInfo());
+ assertNotNull(ser2.getOutputTupleTypeInfo());
+
+ assertEquals(ser.getInputTupleTypeInfo1(), ser2.getInputTupleTypeInfo1());
+ assertEquals(ser.getInputTupleTypeInfo2(), ser2.getInputTupleTypeInfo2());
+ assertEquals(ser.getOutputTupleTypeInfo(), ser2.getOutputTupleTypeInfo());
+ }
+}