You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/13 15:53:53 UTC

flink git commit: [FLINK-2432] Custom serializer support

Repository: flink
Updated Branches:
  refs/heads/master 946e8f648 -> 30647a2e6


[FLINK-2432] Custom serializer support

This closes #962


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30647a2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30647a2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30647a2e

Branch: refs/heads/master
Commit: 30647a2e6b74563e441947ad2f5726d9627251c2
Parents: 946e8f6
Author: zentol <ch...@apache.org>
Authored: Fri Nov 13 14:38:05 2015 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Nov 13 15:53:34 2015 +0100

----------------------------------------------------------------------
 .../flink/python/api/streaming/Receiver.java    |  20 ++-
 .../flink/python/api/streaming/Sender.java      |  18 ++-
 .../python/api/types/CustomTypeWrapper.java     |  34 +++++
 .../python/api/flink/connection/Collector.py    |  38 ++++--
 .../python/api/flink/connection/Iterator.py     | 123 +++++++------------
 .../api/flink/functions/CoGroupFunction.py      |   8 +-
 .../python/api/flink/functions/Function.py      |   6 +-
 .../api/flink/functions/GroupReduceFunction.py  |  10 +-
 .../api/flink/functions/ReduceFunction.py       |  10 +-
 .../flink/python/api/flink/plan/Constants.py    |   7 ++
 .../flink/python/api/flink/plan/Environment.py  |  21 +++-
 .../org/apache/flink/python/api/test_main.py    |  28 ++++-
 12 files changed, 215 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
index 07698d3..a706053 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
@@ -34,6 +34,7 @@ import static org.apache.flink.python.api.streaming.Sender.TYPE_NULL;
 import static org.apache.flink.python.api.streaming.Sender.TYPE_SHORT;
 import static org.apache.flink.python.api.streaming.Sender.TYPE_STRING;
 import static org.apache.flink.python.api.streaming.Sender.TYPE_TUPLE;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
 import org.apache.flink.util.Collector;
 
 /**
@@ -192,7 +193,7 @@ public class Receiver implements Serializable {
 			case TYPE_NULL:
 				return null;
 			default:
-				throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
+				return new CustomTypeDeserializer(type).deserialize();
 		}
 	}
 
@@ -245,14 +246,29 @@ public class Receiver implements Serializable {
 			case TYPE_NULL:
 				return new NullDeserializer();
 			default:
-				throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
+				return new CustomTypeDeserializer(type);
 
 		}
 	}
 
 	private interface Deserializer<T> {
 		public T deserialize();
+	}
+
+	private class CustomTypeDeserializer implements Deserializer<CustomTypeWrapper> {
+		private final byte type;
+
+		public CustomTypeDeserializer(byte type) {
+			this.type = type;
+		}
 
+		@Override
+		public CustomTypeWrapper deserialize() {
+			int size = fileBuffer.getInt();
+			byte[] data = new byte[size];
+			fileBuffer.get(data);
+			return new CustomTypeWrapper(type, data);
+		}
 	}
 
 	private class BooleanDeserializer implements Deserializer<Boolean> {

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
index b897f12..2db1441 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
 import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
 
 /**
  * General-purpose class to write data to memory-mapped files.
@@ -180,7 +181,7 @@ public class Sender implements Serializable {
 	}
 
 	private enum SupportedTypes {
-		TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL
+		TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER
 	}
 
 	//=====Serializer===================================================================================================
@@ -231,6 +232,9 @@ public class Sender implements Serializable {
 			case NULL:
 				fileBuffer.put(TYPE_NULL);
 				return new NullSerializer();
+			case CUSTOMTYPEWRAPPER:
+				fileBuffer.put(((CustomTypeWrapper) value).getType());
+				return new CustomTypeSerializer();
 			default:
 				throw new IllegalArgumentException("Unknown Type encountered: " + type);
 		}
@@ -253,6 +257,18 @@ public class Sender implements Serializable {
 		public abstract void serializeInternal(T value);
 	}
 
+	private class CustomTypeSerializer extends Serializer<CustomTypeWrapper> {
+		public CustomTypeSerializer() {
+			super(0);
+		}
+		@Override
+		public void serializeInternal(CustomTypeWrapper value) {
+			byte[] bytes = value.getData();
+			buffer = ByteBuffer.wrap(bytes);
+			buffer.position(bytes.length);
+		}
+	}
+
 	private class ByteSerializer extends Serializer<Byte> {
 		public ByteSerializer() {
 			super(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
new file mode 100644
index 0000000..e16c3eb
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.types;
+
+/**
+ * Container for serialized python objects, generally assumed to be custom objects.
+ */
+public class CustomTypeWrapper {
+	private final byte typeID;
+	private final byte[] data;
+
+	public CustomTypeWrapper(byte typeID, byte[] data) {
+		this.typeID = typeID;
+		this.data = data;
+	}
+
+	public byte getType() {
+		return typeID;
+	}
+
+	public byte[] getData() {
+		return data;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py
index bf35756..b5674b9 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py
@@ -19,6 +19,7 @@ from struct import pack
 import sys
 
 from flink.connection.Constants import Types
+from flink.plan.Constants import _Dummy
 
 PY2 = sys.version_info[0] == 2
 PY3 = sys.version_info[0] == 3
@@ -30,15 +31,16 @@ else:
 
 
 class Collector(object):
-    def __init__(self, con):
+    def __init__(self, con, env):
         self._connection = con
         self._serializer = None
+        self._env = env
 
     def _close(self):
         self._connection.send_end_signal()
 
     def collect(self, value):
-        self._serializer = _get_serializer(self._connection.write, value)
+        self._serializer = _get_serializer(self._connection.write, value, self._env._types)
         self.collect = self._collect
         self.collect(value)
 
@@ -46,11 +48,11 @@ class Collector(object):
         self._connection.write(self._serializer.serialize(value))
 
 
-def _get_serializer(write, value):
+def _get_serializer(write, value, custom_types):
     if isinstance(value, (list, tuple)):
         write(Types.TYPE_TUPLE)
         write(pack(">I", len(value)))
-        return TupleSerializer(write, value)
+        return TupleSerializer(write, value, custom_types)
     elif value is None:
         write(Types.TYPE_NULL)
         return NullSerializer()
@@ -70,12 +72,25 @@ def _get_serializer(write, value):
         write(Types.TYPE_DOUBLE)
         return FloatSerializer()
     else:
+        for entry in custom_types:
+            if isinstance(value, entry[1]):
+                write(entry[0])
+                return CustomTypeSerializer(entry[2])
         raise Exception("Unsupported Type encountered.")
 
 
+class CustomTypeSerializer(object):
+    def __init__(self, serializer):
+        self._serializer = serializer
+
+    def serialize(self, value):
+        msg = self._serializer.serialize(value)
+        return b"".join([pack(">i",len(msg)), msg])
+
+
 class TupleSerializer(object):
-    def __init__(self, write, value):
-        self.serializer = [_get_serializer(write, field) for field in value]
+    def __init__(self, write, value, custom_types):
+        self.serializer = [_get_serializer(write, field, custom_types) for field in value]
 
     def serialize(self, value):
         bits = []
@@ -117,8 +132,9 @@ class NullSerializer(object):
 
 
 class TypedCollector(object):
-    def __init__(self, con):
+    def __init__(self, con, env):
         self._connection = con
+        self._env = env
 
     def collect(self, value):
         if not isinstance(value, (list, tuple)):
@@ -153,5 +169,13 @@ class TypedCollector(object):
             value = bytes(value)
             size = pack(">I", len(value))
             self._connection.write(b"".join([Types.TYPE_BYTES, size, value]))
+        elif isinstance(value, _Dummy):
+            self._connection.write(pack(">i", 127)[3:])
+            self._connection.write(pack(">i", 0))
         else:
+            for entry in self._env._types:
+                if isinstance(value, entry[1]):
+                    self._connection.write(entry[0])
+                    self._connection.write(CustomTypeSerializer(entry[2]).serialize(value))
+                    return
             raise Exception("Unsupported Type encountered.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
index fb0e26d..0e740cf 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
@@ -168,21 +168,25 @@ class CoGroupIterator(object):
 
 
 class Iterator(defIter.Iterator):
-    def __init__(self, con, group=0):
+    def __init__(self, con, env, group=0):
         super(Iterator, self).__init__()
         self._connection = con
         self._init = True
         self._group = group
         self._deserializer = None
+        self._env = env
 
     def __next__(self):
         return self.next()
 
+    def _read(self, des_size):
+        return self._connection.read(des_size, self._group)
+
     def next(self):
         if self.has_next():
             if self._deserializer is None:
-                self._deserializer = _get_deserializer(self._group, self._connection.read)
-            return self._deserializer.deserialize()
+                self._deserializer = _get_deserializer(self._group, self._connection.read, self._env._types)
+            return self._deserializer.deserialize(self._read)
         else:
             raise StopIteration
 
@@ -207,121 +211,88 @@ class DummyIterator(Iterator):
         return False
 
 
-def _get_deserializer(group, read, type=None):
+def _get_deserializer(group, read, custom_types, type=None):
     if type is None:
         type = read(1, group)
-        return _get_deserializer(group, read, type)
+        return _get_deserializer(group, read, custom_types, type)
     elif type == Types.TYPE_TUPLE:
-        return TupleDeserializer(read, group)
+        return TupleDeserializer(read, group, custom_types)
     elif type == Types.TYPE_BYTE:
-        return ByteDeserializer(read, group)
+        return ByteDeserializer()
     elif type == Types.TYPE_BYTES:
-        return ByteArrayDeserializer(read, group)
+        return ByteArrayDeserializer()
     elif type == Types.TYPE_BOOLEAN:
-        return BooleanDeserializer(read, group)
+        return BooleanDeserializer()
     elif type == Types.TYPE_FLOAT:
-        return FloatDeserializer(read, group)
+        return FloatDeserializer()
     elif type == Types.TYPE_DOUBLE:
-        return DoubleDeserializer(read, group)
+        return DoubleDeserializer()
     elif type == Types.TYPE_INTEGER:
-        return IntegerDeserializer(read, group)
+        return IntegerDeserializer()
     elif type == Types.TYPE_LONG:
-        return LongDeserializer(read, group)
+        return LongDeserializer()
     elif type == Types.TYPE_STRING:
-        return StringDeserializer(read, group)
+        return StringDeserializer()
     elif type == Types.TYPE_NULL:
-        return NullDeserializer(read, group)
+        return NullDeserializer()
+    else:
+        for entry in custom_types:
+            if type == entry[0]:
+                return entry[3]
+        raise Exception("Unable to find deserializer for type ID " + str(type))
 
 
 class TupleDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-        size = unpack(">I", self.read(4, self._group))[0]
-        self.deserializer = [_get_deserializer(self._group, self.read) for _ in range(size)]
+    def __init__(self, read, group, custom_types):
+        size = unpack(">I", read(4, group))[0]
+        self.deserializer = [_get_deserializer(group, read, custom_types) for _ in range(size)]
 
-    def deserialize(self):
-        return tuple([s.deserialize() for s in self.deserializer])
+    def deserialize(self, read):
+        return tuple([s.deserialize(read) for s in self.deserializer])
 
 
 class ByteDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">c", self.read(1, self._group))[0]
+    def deserialize(self, read):
+        return unpack(">c", read(1))[0]
 
 
 class ByteArrayDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        size = unpack(">i", self.read(4, self._group))[0]
-        return bytearray(self.read(size, self._group)) if size else bytearray(b"")
+    def deserialize(self, read):
+        size = unpack(">i", read(4))[0]
+        return bytearray(read(size)) if size else bytearray(b"")
 
 
 class BooleanDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">?", self.read(1, self._group))[0]
+    def deserialize(self, read):
+        return unpack(">?", read(1))[0]
 
 
 class FloatDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">f", self.read(4, self._group))[0]
+    def deserialize(self, read):
+        return unpack(">f", read(4))[0]
 
 
 class DoubleDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">d", self.read(8, self._group))[0]
+    def deserialize(self, read):
+        return unpack(">d", read(8))[0]
 
 
 class IntegerDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">i", self.read(4, self._group))[0]
+    def deserialize(self, read):
+        return unpack(">i", read(4))[0]
 
 
 class LongDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">q", self.read(8, self._group))[0]
+    def deserialize(self, read):
+        return unpack(">q", read(8))[0]
 
 
 class StringDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        length = unpack(">i", self.read(4, self._group))[0]
-        return self.read(length, self._group).decode("utf-8") if length else ""
+    def deserialize(self, read):
+        length = unpack(">i", read(4))[0]
+        return read(length).decode("utf-8") if length else ""
 
 
 class NullDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
     def deserialize(self):
         return None

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
index db951fe..9c55787 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
@@ -25,13 +25,13 @@ class CoGroupFunction(Function.Function):
         self._keys1 = None
         self._keys2 = None
 
-    def _configure(self, input_file, output_file, port):
+    def _configure(self, input_file, output_file, port, env):
         self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
-        self._iterator = Iterator.Iterator(self._connection, 0)
-        self._iterator2 = Iterator.Iterator(self._connection, 1)
+        self._iterator = Iterator.Iterator(self._connection, env, 0)
+        self._iterator2 = Iterator.Iterator(self._connection, env, 1)
         self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)
         self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-        self._configure_chain(Collector.Collector(self._connection))
+        self._configure_chain(Collector.Collector(self._connection, env))
 
     def _run(self):
         collector = self._collector

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
index 5323462..4bf8b3a 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
@@ -32,11 +32,11 @@ class Function(object):
         self.context = None
         self._chain_operator = None
 
-    def _configure(self, input_file, output_file, port):
+    def _configure(self, input_file, output_file, port, env):
         self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
-        self._iterator = Iterator.Iterator(self._connection)
+        self._iterator = Iterator.Iterator(self._connection, env)
         self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-        self._configure_chain(Collector.Collector(self._connection))
+        self._configure_chain(Collector.Collector(self._connection, env))
 
     def _configure_chain(self, collector):
         if self._chain_operator is not None:

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
index 11bba30..23e39ab 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
@@ -29,19 +29,19 @@ class GroupReduceFunction(Function.Function):
         self._combine = False
         self._values = []
 
-    def _configure(self, input_file, output_file, port):
+    def _configure(self, input_file, output_file, port, env):
         if self._combine:
             self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
-            self._iterator = Iterator.Iterator(self._connection)
-            self._collector = Collector.Collector(self._connection)
+            self._iterator = Iterator.Iterator(self._connection, env)
+            self._collector = Collector.Collector(self._connection, env)
             self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
             self._run = self._run_combine
         else:
             self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
-            self._iterator = Iterator.Iterator(self._connection)
+            self._iterator = Iterator.Iterator(self._connection, env)
             self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
             self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-            self._configure_chain(Collector.Collector(self._connection))
+            self._configure_chain(Collector.Collector(self._connection, env))
         self._open()
 
     def _open(self):

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
index ffa6de0..4d19c13 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -27,21 +27,21 @@ class ReduceFunction(Function.Function):
         self._combine = False
         self._values = []
 
-    def _configure(self, input_file, output_file, port):
+    def _configure(self, input_file, output_file, port, env):
         if self._combine:
             self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
-            self._iterator = Iterator.Iterator(self._connection)
-            self._collector = Collector.Collector(self._connection)
+            self._iterator = Iterator.Iterator(self._connection, env)
+            self._collector = Collector.Collector(self._connection, env)
             self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
             self._run = self._run_combine
         else:
             self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
-            self._iterator = Iterator.Iterator(self._connection)
+            self._iterator = Iterator.Iterator(self._connection, env)
             if self._keys is None:
                 self._run = self._run_allreduce
             else:
                 self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
-            self._configure_chain(Collector.Collector(self._connection))
+            self._configure_chain(Collector.Collector(self._connection, env))
             self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
 
     def _set_grouping_keys(self, keys):

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
index f60273f..b0d79e8 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -91,6 +91,11 @@ import sys
 PY2 = sys.version_info[0] == 2
 PY3 = sys.version_info[0] == 3
 
+
+class _Dummy(object):
+    pass
+
+
 if PY2:
     BOOL = True
     INT = 1
@@ -98,9 +103,11 @@ if PY2:
     FLOAT = 2.5
     STRING = "type"
     BYTES = bytearray(b"byte")
+    CUSTOM = _Dummy()
 elif PY3:
     BOOL = True
     INT = 1
     FLOAT = 2.5
     STRING = "type"
     BYTES = bytearray(b"byte")
+    CUSTOM = _Dummy()

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 236eda4..8647686 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -22,7 +22,7 @@ from flink.plan.Constants import _Fields, _Identifier
 from flink.utilities import Switch
 import copy
 import sys
-
+from struct import pack
 
 def get_environment():
     """
@@ -49,6 +49,19 @@ class Environment(object):
         #specials
         self._broadcast = []
 
+        self._types = []
+
+    def register_type(self, type, serializer, deserializer):
+        """
+        Registers the given type with this environment, allowing all operators within to
+        (de-)serialize objects of the given type.
+
+        :param type: class of the objects to be (de-)serialized
+        :param serializer: instance of the serializer
+        :param deserializer: instance of the deserializer
+        """
+        self._types.append((pack(">i",126 - len(self._types))[3:], type, serializer, deserializer))
+
     def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
         """
         Create a DataSet that represents the tuples produced by reading the given CSV file.
@@ -127,7 +140,7 @@ class Environment(object):
         if plan_mode:
             output_path = sys.stdin.readline().rstrip('\n')
             self._connection = Connection.OneWayBusyBufferingMappedFileConnection(output_path)
-            self._collector = Collector.TypedCollector(self._connection)
+            self._collector = Collector.TypedCollector(self._connection, self)
             self._send_plan()
             self._connection._write_buffer()
         else:
@@ -146,7 +159,7 @@ class Environment(object):
                         operator = set[_Fields.OPERATOR]
                     if set[_Fields.ID] == -id:
                         operator = set[_Fields.COMBINEOP]
-                operator._configure(input_path, output_path, port)
+                operator._configure(input_path, output_path, port, self)
                 operator._go()
                 sys.stdout.flush()
                 sys.stderr.flush()
@@ -342,4 +355,4 @@ class Environment(object):
             collect(_Identifier.BROADCAST)
             collect(entry[_Fields.PARENT][_Fields.ID])
             collect(entry[_Fields.OTHER][_Fields.ID])
-            collect(entry[_Fields.NAME])
\ No newline at end of file
+            collect(entry[_Fields.NAME])

http://git-wip-us.apache.org/repos/asf/flink/blob/30647a2e/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 2116d1f..2666945 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -25,7 +25,8 @@ from flink.functions.CrossFunction import CrossFunction
 from flink.functions.JoinFunction import JoinFunction
 from flink.functions.GroupReduceFunction import GroupReduceFunction
 from flink.functions.CoGroupFunction import CoGroupFunction
-from flink.plan.Constants import INT, STRING, FLOAT, BOOL, Order
+from flink.plan.Constants import INT, STRING, FLOAT, BOOL, CUSTOM, Order
+import struct
 
 
 class Mapper(MapFunction):
@@ -259,6 +260,31 @@ if __name__ == "__main__":
         .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \
         .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output()
 
+    #Custom Serialization
+    class Ext(MapPartitionFunction):
+        def map_partition(self, iterator, collector):
+            for value in iterator:
+                collector.collect(value.value)
+
+    class MyObj(object):
+        def __init__(self, i):
+            self.value = i
+
+    class MySerializer(object):
+        def serialize(self, value):
+            return struct.pack(">i", value.value)
+
+    class MyDeserializer(object):
+        def deserialize(self, read):
+            i = struct.unpack(">i", read(4))[0]
+            return MyObj(i)
+
+    env.register_type(MyObj, MySerializer(), MyDeserializer())
+
+    env.from_elements(MyObj(2), MyObj(4)) \
+        .map(Id(), CUSTOM).map_partition(Ext(), INT) \
+        .map_partition(Verify([2, 4], "CustomTypeSerialization"), STRING).output()
+
     env.set_degree_of_parallelism(1)
 
     env.execute(local=True)