You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/21 15:47:40 UTC

[04/10] flink git commit: [FLINK-671] Python API

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
new file mode 100644
index 0000000..7b1c5c5
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
@@ -0,0 +1,327 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from struct import unpack
+from collections import deque
+
+try:
+    import _abcoll as defIter
+except:
+    import _collections_abc as defIter
+
+from flink.connection.Constants import Types
+
+
+class ListIterator(defIter.Iterator):
+    def __init__(self, values):
+        super(ListIterator, self).__init__()
+        self._values = deque(values)
+
+    def __next__(self):
+        return self.next()
+
+    def next(self):
+        if self.has_next():
+            return self._values.popleft()
+        else:
+            raise StopIteration
+
+    def has_next(self):
+        return self._values
+
+
+class GroupIterator(defIter.Iterator):
+    def __init__(self, iterator, keys=None):
+        super(GroupIterator, self).__init__()
+        self.iterator = iterator
+        self.key = None
+        self.keys = keys
+        if self.keys is None:
+            self._extract_keys = self._extract_keys_id
+        self.cur = None
+        self.empty = False
+
+    def _init(self):
+        if self.iterator.has_next():
+            self.empty = False
+            self.cur = self.iterator.next()
+            self.key = self._extract_keys(self.cur)
+        else:
+            self.empty = True
+
+    def __next__(self):
+        return self.next()
+
+    def next(self):
+        if self.has_next():
+            tmp = self.cur
+            if self.iterator.has_next():
+                self.cur = self.iterator.next()
+                if self.key != self._extract_keys(self.cur):
+                    self.empty = True
+            else:
+                self.cur = None
+                self.empty = True
+            return tmp
+        else:
+            raise StopIteration
+
+    def has_next(self):
+        if self.empty:
+            return False
+        return self.key == self._extract_keys(self.cur)
+
+    def has_group(self):
+        return self.cur is not None
+
+    def next_group(self):
+        self.key = self._extract_keys(self.cur)
+        self.empty = False
+
+    def _extract_keys(self, x):
+        return [x[k] for k in self.keys]
+
+    def _extract_keys_id(self, x):
+        return x
+
+
+class CoGroupIterator(object):
+    NONE_REMAINED = 1
+    FIRST_REMAINED = 2
+    SECOND_REMAINED = 3
+    FIRST_EMPTY = 4
+    SECOND_EMPTY = 5
+
+    def __init__(self, c1, c2, k1, k2):
+        self.i1 = GroupIterator(c1, k1)
+        self.i2 = GroupIterator(c2, k2)
+        self.p1 = None
+        self.p2 = None
+        self.match = None
+        self.key = None
+
+    def _init(self):
+        self.i1._init()
+        self.i2._init()
+
+    def next(self):
+        first_empty = True
+        second_empty = True
+
+        if self.match != CoGroupIterator.FIRST_EMPTY:
+            if self.match == CoGroupIterator.FIRST_REMAINED:
+                first_empty = False
+            else:
+                if self.i1.has_group():
+                    self.i1.next_group()
+                    self.key = self.i1.key
+                    first_empty = False
+
+        if self.match != CoGroupIterator.SECOND_EMPTY:
+            if self.match == CoGroupIterator.SECOND_REMAINED:
+                second_empty = False
+            else:
+                if self.i2.has_group():
+                    self.i2.next_group()
+                    second_empty = False
+
+        if first_empty and second_empty:
+            return False
+        elif first_empty and (not second_empty):
+            self.p1 = DummyIterator()
+            self.p2 = self.i2
+            self.match = CoGroupIterator.FIRST_EMPTY
+            return True
+        elif (not first_empty) and second_empty:
+            self.p1 = self.i1
+            self.p2 = DummyIterator()
+            self.match = CoGroupIterator.SECOND_EMPTY
+            return True
+        else:
+            if self.key == self.i2.key:
+                self.p1 = self.i1
+                self.p2 = self.i2
+                self.match = CoGroupIterator.NONE_REMAINED
+            elif self.key < self.i2.key:
+                self.p1 = self.i1
+                self.p2 = DummyIterator()
+                self.match = CoGroupIterator.SECOND_REMAINED
+            else:
+                self.p1 = DummyIterator()
+                self.p2 = self.i2
+                self.match = CoGroupIterator.FIRST_REMAINED
+            return True
+
+
+class Iterator(defIter.Iterator):
+    def __init__(self, con, group=0):
+        super(Iterator, self).__init__()
+        self._connection = con
+        self._init = True
+        self._group = group
+        self._deserializer = None
+
+    def __next__(self):
+        return self.next()
+
+    def next(self):
+        if self.has_next():
+            if self._deserializer is None:
+                self._deserializer = _get_deserializer(self._group, self._connection.read)
+            return self._deserializer.deserialize()
+        else:
+            raise StopIteration
+
+    def has_next(self):
+        return self._connection.has_next(self._group)
+
+    def _reset(self):
+        self._deserializer = None
+
+
+class DummyIterator(Iterator):
+    def __init__(self):
+        super(Iterator, self).__init__()
+
+    def __next__(self):
+        raise StopIteration
+
+    def next(self):
+        raise StopIteration
+
+    def has_next(self):
+        return False
+
+
+def _get_deserializer(group, read, type=None):
+    if type is None:
+        type = read(1, group)
+        return _get_deserializer(group, read, type)
+    elif type == Types.TYPE_TUPLE:
+        return TupleDeserializer(read, group)
+    elif type == Types.TYPE_BYTE:
+        return ByteDeserializer(read, group)
+    elif type == Types.TYPE_BYTES:
+        return ByteArrayDeserializer(read, group)
+    elif type == Types.TYPE_BOOLEAN:
+        return BooleanDeserializer(read, group)
+    elif type == Types.TYPE_FLOAT:
+        return FloatDeserializer(read, group)
+    elif type == Types.TYPE_DOUBLE:
+        return DoubleDeserializer(read, group)
+    elif type == Types.TYPE_INTEGER:
+        return IntegerDeserializer(read, group)
+    elif type == Types.TYPE_LONG:
+        return LongDeserializer(read, group)
+    elif type == Types.TYPE_STRING:
+        return StringDeserializer(read, group)
+    elif type == Types.TYPE_NULL:
+        return NullDeserializer(read, group)
+
+
+class TupleDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+        size = unpack(">I", self.read(4, self._group))[0]
+        self.deserializer = [_get_deserializer(self._group, self.read) for _ in range(size)]
+
+    def deserialize(self):
+        return tuple([s.deserialize() for s in self.deserializer])
+
+
+class ByteDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        return unpack(">c", self.read(1, self._group))[0]
+
+
+class ByteArrayDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        size = unpack(">i", self.read(4, self._group))[0]
+        return bytearray(self.read(size, self._group))
+
+
+class BooleanDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        return unpack(">?", self.read(1, self._group))[0]
+
+
+class FloatDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        return unpack(">f", self.read(4, self._group))[0]
+
+
+class DoubleDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        return unpack(">d", self.read(8, self._group))[0]
+
+
+class IntegerDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        return unpack(">i", self.read(4, self._group))[0]
+
+
+class LongDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        return unpack(">q", self.read(8, self._group))[0]
+
+
+class StringDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        length = unpack(">i", self.read(4, self._group))[0]
+        return self.read(length, self._group).decode("utf-8")
+
+
+class NullDeserializer(object):
+    def __init__(self, read, group):
+        self.read = read
+        self._group = group
+
+    def deserialize(self):
+        return None

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
new file mode 100644
index 0000000..7f3cf94
Binary files /dev/null and b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc differ

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TriangleEnumeration.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TriangleEnumeration.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TriangleEnumeration.py
new file mode 100644
index 0000000..2727635
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TriangleEnumeration.py
@@ -0,0 +1,152 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, Order
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.ReduceFunction import ReduceFunction
+from flink.functions.MapFunction import MapFunction
+from flink.functions.JoinFunction import JoinFunction
+
+
+class EdgeDuplicator(FlatMapFunction):
+    def flat_map(self, value, collector):
+        collector.collect((value[0], value[1]))
+        collector.collect((value[1], value[0]))
+
+
+class DegreeCounter(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        other_vertices = []
+
+        data = iterator.next()
+        edge = (data[0], data[1])
+
+        group_vertex = edge[0]
+        other_vertices.append(edge[1])
+
+        while iterator.has_next():
+            data = iterator.next()
+            edge = [data[0], data[1]]
+            other_vertex = edge[1]
+
+            contained = False
+            for v in other_vertices:
+                if v == other_vertex:
+                    contained = True
+                    break
+            if not contained and not other_vertex == group_vertex:
+                other_vertices.append(other_vertex)
+
+        degree = len(other_vertices)
+
+        for other_vertex in other_vertices:
+            if group_vertex < other_vertex:
+                output_edge = (group_vertex, degree, other_vertex, 0)
+            else:
+                output_edge = (other_vertex, 0, group_vertex, degree)
+            collector.collect(output_edge)
+
+
+class DegreeJoiner(ReduceFunction):
+    def reduce(self, value1, value2):
+        edge1 = [value1[0], value1[1], value1[2], value1[3]]
+        edge2 = [value2[0], value2[1], value2[2], value2[3]]
+
+        out_edge = [edge1[0], edge1[1], edge1[2], edge1[3]]
+        if edge1[1] == 0 and (not edge1[3] == 0):
+            out_edge[1] = edge2[1]
+        elif (not edge1[1] == 0) and edge1[3] == 0:
+            out_edge[3] = edge2[3]
+        return out_edge
+
+
+class EdgeByDegreeProjector(MapFunction):
+    def map(self, value):
+        if value[1] > value[3]:
+            return (value[2], value[0])
+        else:
+            return (value[0], value[2])
+
+
+class EdgeByIdProjector(MapFunction):
+    def map(self, value):
+        edge = (value[0], value[1])
+        if value[0] > value[1]:
+            return (value[1], value[0])
+        else:
+            return (value[0], value[1])
+
+
+class TriadBuilder(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        vertices = []
+
+        y = iterator.next()
+        first_edge = [y[0], y[1]]
+
+        vertices.append(first_edge[1])
+
+        while iterator.has_next():
+            x = iterator.next()
+            second_edge = [x[0], x[1]]
+            higher_vertex_id = second_edge[1]
+
+            for lowerVertexId in vertices:
+                collector.collect((first_edge[0], lowerVertexId, higher_vertex_id))
+            vertices.append(higher_vertex_id)
+
+
+class TriadFilter(JoinFunction):
+    def join(self, value1, value2):
+        return value1
+
+
+if __name__ == "__main__":
+    env = get_environment()
+    edges = env.from_elements(
+        (1, 2), (1, 3), (1, 4), (1, 5), (2, 3), (2, 5), (3, 4), (3, 7), (3, 8), (5, 6), (7, 8))
+
+    edges_with_degrees = edges \
+        .flat_map(EdgeDuplicator(), [INT, INT]) \
+        .group_by(0) \
+        .sort_group(1, Order.ASCENDING) \
+        .reduce_group(DegreeCounter(), [INT, INT, INT, INT]) \
+        .group_by(0, 2) \
+        .reduce(DegreeJoiner())
+
+    edges_by_degree = edges_with_degrees \
+        .map(EdgeByDegreeProjector(), [INT, INT])
+
+    edges_by_id = edges_by_degree \
+        .map(EdgeByIdProjector(), [INT, INT])
+
+    triangles = edges_by_degree \
+        .group_by(0) \
+        .sort_group(1, Order.ASCENDING) \
+        .reduce_group(TriadBuilder(), [INT, INT, INT]) \
+        .join(edges_by_id) \
+        .where(1, 2) \
+        .equal_to(0, 1) \
+        .using(TriadFilter(), [INT, INT, INT])
+
+    triangles.output()
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/WordCount.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/WordCount.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/WordCount.py
new file mode 100644
index 0000000..8a89a6f
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/WordCount.py
@@ -0,0 +1,61 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+
+
+class Tokenizer(FlatMapFunction):
+    def flat_map(self, value, collector):
+        for word in value.lower().split():
+            collector.collect((1, word))
+
+
+class Adder(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        count, word = iterator.next()
+        count += sum([x[0] for x in iterator])
+        collector.collect((count, word))
+
+
+if __name__ == "__main__":
+    env = get_environment()
+    if len(sys.argv) != 1 and len(sys.argv) != 3:
+        sys.exit("Usage: ./bin/pyflink.sh WordCount[ - <text path> <result path>]")
+
+    if len(sys.argv) == 3:
+        data = env.read_text(sys.argv[1])
+    else:
+        data = env.from_elements("hello","world","hello","car","tree","data","hello")
+
+    result = data \
+        .flat_map(Tokenizer(), (INT, STRING)) \
+        .group_by(1) \
+        .reduce_group(Adder(), (INT, STRING), combinable=True) \
+
+    if len(sys.argv) == 3:
+        result.write_csv(sys.argv[2])
+    else:
+        result.output()
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
new file mode 100644
index 0000000..c39caf7
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
@@ -0,0 +1,53 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function, RuntimeContext
+from flink.connection import Iterator, Connection, Collector
+
+
+class CoGroupFunction(Function.Function):
+    def __init__(self):
+        super(CoGroupFunction, self).__init__()
+        self._keys1 = None
+        self._keys2 = None
+
+    def _configure(self, input_file, output_file, port):
+        self._connection = Connection.TwinBufferingUDPMappedFileConnection(input_file, output_file, port)
+        self._iterator = Iterator.Iterator(self._connection, 0)
+        self._iterator2 = Iterator.Iterator(self._connection, 1)
+        self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)
+        self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+        self._configure_chain(Collector.Collector(self._connection))
+
+    def _run(self):
+        collector = self._collector
+        iterator = self._cgiter
+        function = self.co_group
+        iterator._init()
+        while iterator.next():
+            result = function(iterator.p1, iterator.p2, collector)
+            if result is not None:
+                for res in result:
+                    collector.collect(res)
+            while iterator.p1.has_next():
+                iterator.p1.next()
+            while iterator.p2.has_next():
+                iterator.p2.next()
+        collector._close()
+
+    def co_group(self, iterator1, iterator2, collector):
+        pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CrossFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CrossFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CrossFunction.py
new file mode 100644
index 0000000..6657d45
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CrossFunction.py
@@ -0,0 +1,34 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class CrossFunction(Function.Function):
+    def __init__(self):
+        super(CrossFunction, self).__init__()
+
+    def _run(self):
+        collector = self._collector
+        function = self.cross
+        iterator = self._iterator
+        for value in iterator:
+            collector.collect(function(value[0], value[1]))
+        collector._close()
+
+    def cross(self, value1, value2):
+        pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FilterFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FilterFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FilterFunction.py
new file mode 100644
index 0000000..fb4fb74
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FilterFunction.py
@@ -0,0 +1,38 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class FilterFunction(Function.Function):
+    def __init__(self):
+        super(FilterFunction, self).__init__()
+
+    def _run(self):
+        collector = self._collector
+        function = self.filter
+        for value in self._iterator:
+            if function(value):
+                collector.collect(value)
+        collector._close()
+
+    def collect(self, value):
+        if self.filter(value):
+            self._collector.collect(value)
+
+    def filter(self, value):
+        pass

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FlatMapFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FlatMapFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FlatMapFunction.py
new file mode 100644
index 0000000..ebc493c
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FlatMapFunction.py
@@ -0,0 +1,44 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class FlatMapFunction(Function.Function):
+    def __init__(self):
+        super(FlatMapFunction, self).__init__()
+
+    def _run(self):
+        collector = self._collector
+        function = self.flat_map
+        iterator = self._iterator
+        for value in iterator:
+            result = function(value, collector)
+            if result is not None:
+                for res in result:
+                    collector.collect(res)
+        collector._close()
+
+    def collect(self, value):
+        collector = self._collector
+        result = self.flat_map(value, collector)
+        if result is not None:
+            for res in result:
+                collector.collect(res)
+
+    def flat_map(self, value, collector):
+        pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
new file mode 100644
index 0000000..8e5227e
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
@@ -0,0 +1,92 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from abc import ABCMeta, abstractmethod
+import dill
+import sys
+from collections import deque
+from flink.connection import Connection, Iterator, Collector
+from flink.functions import RuntimeContext
+
+
+class Function(object):
+    __metaclass__ = ABCMeta
+
+    def __init__(self):
+        self._connection = None
+        self._iterator = None
+        self._collector = None
+        self.context = None
+        self._chain_operator = None
+        self._meta = None
+
+    def _configure(self, input_file, output_file, port):
+        self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+        self._iterator = Iterator.Iterator(self._connection)
+        self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+        self._configure_chain(Collector.Collector(self._connection))
+
+    def _configure_chain(self, collector):
+        if self._chain_operator is not None:
+            frag = self._meta.split("|")
+            if "flink/functions" in frag[0]:#lambda function
+                exec("from flink.functions." + frag[1] + " import " + frag[1])
+            else:
+                self._chain_operator = self._chain_operator.replace(b"__main__", b"plan")
+                exec("from plan import " + frag[1])
+            self._collector = dill.loads(self._chain_operator)
+            self._collector.context = self.context
+            self._collector._configure_chain(collector)
+            self._collector._open()
+        else:
+            self._collector = collector
+
+    def _chain(self, operator, meta):
+        self._chain_operator = operator
+        self._meta = meta
+
+    @abstractmethod
+    def _run(self):
+        pass
+
+    def _open(self):
+        pass
+
+    def _close(self):
+        self._collector._close()
+
+    def _go(self):
+        self._receive_broadcast_variables()
+        self._run()
+
+    def _receive_broadcast_variables(self):
+        broadcast_count = self._iterator.next()
+        self._iterator._reset()
+        self._connection.reset()
+        for _ in range(broadcast_count):
+            name = self._iterator.next()
+            self._iterator._reset()
+            self._connection.reset()
+            bc = deque()
+            while(self._iterator.has_next()):
+                bc.append(self._iterator.next())
+            self.context._add_broadcast_variable(name, bc)
+            self._iterator._reset()
+            self._connection.reset()
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
new file mode 100644
index 0000000..35359e2
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
@@ -0,0 +1,127 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from collections import defaultdict
+from flink.functions import Function, RuntimeContext
+from flink.connection import Connection, Iterator, Collector
+from flink.plan.Constants import Order
+
+
+class GroupReduceFunction(Function.Function):
+    def __init__(self):
+        super(GroupReduceFunction, self).__init__()
+        self._keys = None
+        self._sort_ops = []
+        self._combine = False
+        self._values = []
+
+    def _configure(self, input_file, output_file, port):
+        if self._combine:
+            self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+            self._iterator = Iterator.Iterator(self._connection)
+            self._collector = Collector.Collector(self._connection)
+            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+            self._run = self._run_combine
+        else:
+            self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+            self._iterator = Iterator.Iterator(self._connection)
+            self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
+            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+            self._configure_chain(Collector.Collector(self._connection))
+        self._open()
+
+    def _open(self):
+        if self._keys is None:
+            self._extract_keys = self._extract_keys_id
+
+    def _close(self):
+        self._sort_and_combine()
+        self._collector._close()
+
+    def _set_grouping_keys(self, keys):
+        self._keys = keys
+
+    def _set_sort_ops(self, ops):
+        self._sort_ops = ops
+
+    def _run(self):#reduce
+        connection = self._connection
+        collector = self._collector
+        function = self.reduce
+        iterator = self._group_iterator
+        iterator._init()
+        while iterator.has_group():
+            iterator.next_group()
+            result = function(iterator, collector)
+            if result is not None:
+                for value in result:
+                    collector.collect(value)
+        collector._close()
+        connection.send_end_signal()
+
+    def _run_combine(self):#unchained combine
+        connection = self._connection
+        collector = self._collector
+        function = self.combine
+        iterator = self._iterator
+        while 1:
+            result = function(iterator, collector)
+            if result is not None:
+                for value in result:
+                    collector.collect(value)
+            connection.send_end_signal()
+            connection.reset()
+
+    def collect(self, value):#chained combine
+        self._values.append(value)
+        if len(self._values) > 1000:
+            self._sort_and_combine()
+
+    def _sort_and_combine(self):
+        values = self._values
+        function = self.combine
+        collector = self._collector
+        extractor = self._extract_keys
+        grouping = defaultdict(list)
+        for value in values:
+            grouping[extractor(value)].append(value)
+        keys = list(grouping.keys())
+        keys.sort()
+        for key in keys:
+            values = grouping[key]
+            for op in self._sort_ops:
+                values.sort(key=lambda x:x[op[0]], reverse = op[1] == Order.DESCENDING)
+            result = function(Iterator.ListIterator(values), collector)
+            if result is not None:
+                for res in result:
+                    collector.collect(res)
+        self._values = []
+
+    def _extract_keys(self, x):
+        return tuple([x[k] for k in self._keys])
+
+    def _extract_sort_keys(self, x):
+        return tuple(x[k] for k in self._sort_keys)
+
+    def _extract_keys_id(self, x):
+        return x
+
+    def reduce(self, iterator, collector):
+        pass
+
+    def combine(self, iterator, collector):
+        self.reduce(iterator, collector)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/JoinFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/JoinFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/JoinFunction.py
new file mode 100644
index 0000000..0c1274b
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/JoinFunction.py
@@ -0,0 +1,33 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class JoinFunction(Function.Function):
+    def __init__(self):
+        super(JoinFunction, self).__init__()
+
+    def _run(self):
+        collector = self._collector
+        function = self.join
+        for value in self._iterator:
+            collector.collect(function(value[0], value[1]))
+        collector._close()
+
+    def join(self, value1, value2):
+        pass

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapFunction.py
new file mode 100644
index 0000000..882cfee
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapFunction.py
@@ -0,0 +1,36 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class MapFunction(Function.Function):
+    def __init__(self):
+        super(MapFunction, self).__init__()
+
+    def _run(self):
+        collector = self._collector
+        function = self.map
+        for value in self._iterator:
+            collector.collect(function(value))
+        collector._close()
+
+    def collect(self, value):
+        self._collector.collect(self.map(value))
+
+    def map(self, value):
+        pass

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapPartitionFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapPartitionFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapPartitionFunction.py
new file mode 100644
index 0000000..38bff98
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapPartitionFunction.py
@@ -0,0 +1,34 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class MapPartitionFunction(Function.Function):
+    def __init__(self):
+        super(MapPartitionFunction, self).__init__()
+
+    def _run(self):
+        collector = self._collector
+        result = self.map_partition(self._iterator, collector)
+        if result is not None:
+            for res in result:
+                collector.collect(res)
+        collector._close()
+
+    def map_partition(self, iterator, collector):
+        pass

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
new file mode 100644
index 0000000..7da1ef4
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
@@ -0,0 +1,123 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from collections import defaultdict
+from flink.functions import Function, RuntimeContext
+from flink.connection import Connection, Iterator, Collector
+
+
+class ReduceFunction(Function.Function):
+    def __init__(self):
+        super(ReduceFunction, self).__init__()
+        self._keys = None
+        self._combine = False
+        self._values = []
+
+    def _configure(self, input_file, output_file, port):
+        if self._combine:
+            self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+            self._iterator = Iterator.Iterator(self._connection)
+            self._collector = Collector.Collector(self._connection)
+            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+            self._run = self._run_combine
+        else:
+            self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+            self._iterator = Iterator.Iterator(self._connection)
+            if self._keys is None:
+                self._run = self._run_allreduce
+            else:
+                self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
+            self._configure_chain(Collector.Collector(self._connection))
+            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+
+    def _set_grouping_keys(self, keys):
+        self._keys = keys
+
+    def _close(self):
+        self._sort_and_combine()
+        self._collector._close()
+
+    def _run(self):#grouped reduce
+        collector = self._collector
+        function = self.reduce
+        iterator = self._group_iterator
+        iterator._init()
+        while iterator.has_group():
+            iterator.next_group()
+            if iterator.has_next():
+                base = iterator.next()
+                for value in iterator:
+                    base = function(base, value)
+            collector.collect(base)
+        collector._close()
+
+    def _run_allreduce(self):#ungrouped reduce
+        collector = self._collector
+        function = self.reduce
+        iterator = self._iterator
+        if iterator.has_next():
+            base = iterator.next()
+            for value in iterator:
+                base = function(base, value)
+            collector.collect(base)
+        collector._close()
+
+    def _run_combine(self):#unchained combine
+        connection = self._connection
+        collector = self._collector
+        function = self.combine
+        iterator = self._iterator
+        while 1:
+            if iterator.has_next():
+                base = iterator.next()
+                while iterator.has_next():
+                    base = function(base, iterator.next())
+            collector.collect(base)
+            connection.send_end_signal()
+            connection.reset()
+
+    def collect(self, value):#chained combine
+        self._values.append(value)
+        if len(self._values) > 1000:
+            self._sort_and_combine()
+
+    def _sort_and_combine(self):
+        values = self._values
+        function = self.combine
+        collector = self._collector
+        extractor = self._extract_keys
+        grouping = defaultdict(list)
+        for value in values:
+            grouping[extractor(value)].append(value)
+        keys = list(grouping.keys())
+        keys.sort()
+        for key in keys:
+            iterator = Iterator.ListIterator(grouping[key])
+            base = iterator.next()
+            while iterator.has_next():
+                base = function(base, iterator.next())
+            collector.collect(base)
+        self._values = []
+
+    def _extract_keys(self, x):
+        return tuple([x[k] for k in self._keys])
+
+    def reduce(self, value1, value2):
+        pass
+
+    def combine(self, value1, value2):
+        return self.reduce(value1, value2)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/RuntimeContext.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/RuntimeContext.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/RuntimeContext.py
new file mode 100644
index 0000000..2977eb5
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/RuntimeContext.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class RuntimeContext(object):
+    def __init__(self, iterator, collector):
+        self.iterator = iterator
+        self.collector = collector
+        self.broadcast_variables = dict()
+
+    def _add_broadcast_variable(self, name, var):
+        self.broadcast_variables[name] = var
+
+    def get_broadcast_variable(self, name):
+        return self.broadcast_variables[name]

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Constants.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Constants.py
new file mode 100644
index 0000000..f60273f
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Constants.py
@@ -0,0 +1,106 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class _Identifier(object):
+    """
+    Gotta be kept in sync with java constants!
+    """
+    SORT = "sort"
+    GROUP = "groupby"
+    COGROUP = "cogroup"
+    CROSS = "cross"
+    CROSSH = "cross_h"
+    CROSST = "cross_t"
+    FLATMAP = "flatmap"
+    FILTER = "filter"
+    MAPPARTITION = "mappartition"
+    GROUPREDUCE = "groupreduce"
+    JOIN = "join"
+    JOINH = "join_h"
+    JOINT = "join_t"
+    MAP = "map"
+    PROJECTION = "projection"
+    REDUCE = "reduce"
+    UNION = "union"
+    SOURCE_CSV = "source_csv"
+    SOURCE_TEXT = "source_text"
+    SOURCE_VALUE = "source_value"
+    SINK_CSV = "sink_csv"
+    SINK_TEXT = "sink_text"
+    SINK_PRINT = "sink_print"
+    BROADCAST = "broadcast"
+
+
+class _Fields(object):
+    PARENT = "parent"
+    OTHER = "other_set"
+    SINKS = "sinks"
+    IDENTIFIER = "identifier"
+    FIELD = "field"
+    ORDER = "order"
+    KEYS = "keys"
+    KEY1 = "key1"
+    KEY2 = "key2"
+    TYPES = "types"
+    OPERATOR = "operator"
+    META = "meta"
+    NAME = "name"
+    COMBINE = "combine"
+    DELIMITER_LINE = "del_l"
+    DELIMITER_FIELD = "del_f"
+    WRITE_MODE = "write"
+    PATH = "path"
+    VALUES = "values"
+    COMBINEOP = "combineop"
+    CHILDREN = "children"
+    BCVARS = "bcvars"
+    PROJECTIONS = "projections"
+    ID = "id"
+    TO_ERR = "to_error"
+
+
+class WriteMode(object):
+    NO_OVERWRITE = 0
+    OVERWRITE = 1
+
+
+class Order(object):
+    NONE = 0
+    ASCENDING = 1
+    DESCENDING = 2
+    ANY = 3
+
+import sys
+
+PY2 = sys.version_info[0] == 2
+PY3 = sys.version_info[0] == 3
+
+if PY2:
+    BOOL = True
+    INT = 1
+    LONG = long(1)
+    FLOAT = 2.5
+    STRING = "type"
+    BYTES = bytearray(b"byte")
+elif PY3:
+    BOOL = True
+    INT = 1
+    FLOAT = 2.5
+    STRING = "type"
+    BYTES = bytearray(b"byte")