You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/21 15:47:40 UTC
[04/10] flink git commit: [FLINK-671] Python API
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
new file mode 100644
index 0000000..7b1c5c5
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
@@ -0,0 +1,327 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from struct import unpack
+from collections import deque
+
+try:
+ import _abcoll as defIter
+except:
+ import _collections_abc as defIter
+
+from flink.connection.Constants import Types
+
+
+class ListIterator(defIter.Iterator):
+ def __init__(self, values):
+ super(ListIterator, self).__init__()
+ self._values = deque(values)
+
+ def __next__(self):
+ return self.next()
+
+ def next(self):
+ if self.has_next():
+ return self._values.popleft()
+ else:
+ raise StopIteration
+
+ def has_next(self):
+ return self._values
+
+
+class GroupIterator(defIter.Iterator):
+ def __init__(self, iterator, keys=None):
+ super(GroupIterator, self).__init__()
+ self.iterator = iterator
+ self.key = None
+ self.keys = keys
+ if self.keys is None:
+ self._extract_keys = self._extract_keys_id
+ self.cur = None
+ self.empty = False
+
+ def _init(self):
+ if self.iterator.has_next():
+ self.empty = False
+ self.cur = self.iterator.next()
+ self.key = self._extract_keys(self.cur)
+ else:
+ self.empty = True
+
+ def __next__(self):
+ return self.next()
+
+ def next(self):
+ if self.has_next():
+ tmp = self.cur
+ if self.iterator.has_next():
+ self.cur = self.iterator.next()
+ if self.key != self._extract_keys(self.cur):
+ self.empty = True
+ else:
+ self.cur = None
+ self.empty = True
+ return tmp
+ else:
+ raise StopIteration
+
+ def has_next(self):
+ if self.empty:
+ return False
+ return self.key == self._extract_keys(self.cur)
+
+ def has_group(self):
+ return self.cur is not None
+
+ def next_group(self):
+ self.key = self._extract_keys(self.cur)
+ self.empty = False
+
+ def _extract_keys(self, x):
+ return [x[k] for k in self.keys]
+
+ def _extract_keys_id(self, x):
+ return x
+
+
+class CoGroupIterator(object):
+ NONE_REMAINED = 1
+ FIRST_REMAINED = 2
+ SECOND_REMAINED = 3
+ FIRST_EMPTY = 4
+ SECOND_EMPTY = 5
+
+ def __init__(self, c1, c2, k1, k2):
+ self.i1 = GroupIterator(c1, k1)
+ self.i2 = GroupIterator(c2, k2)
+ self.p1 = None
+ self.p2 = None
+ self.match = None
+ self.key = None
+
+ def _init(self):
+ self.i1._init()
+ self.i2._init()
+
+ def next(self):
+ first_empty = True
+ second_empty = True
+
+ if self.match != CoGroupIterator.FIRST_EMPTY:
+ if self.match == CoGroupIterator.FIRST_REMAINED:
+ first_empty = False
+ else:
+ if self.i1.has_group():
+ self.i1.next_group()
+ self.key = self.i1.key
+ first_empty = False
+
+ if self.match != CoGroupIterator.SECOND_EMPTY:
+ if self.match == CoGroupIterator.SECOND_REMAINED:
+ second_empty = False
+ else:
+ if self.i2.has_group():
+ self.i2.next_group()
+ second_empty = False
+
+ if first_empty and second_empty:
+ return False
+ elif first_empty and (not second_empty):
+ self.p1 = DummyIterator()
+ self.p2 = self.i2
+ self.match = CoGroupIterator.FIRST_EMPTY
+ return True
+ elif (not first_empty) and second_empty:
+ self.p1 = self.i1
+ self.p2 = DummyIterator()
+ self.match = CoGroupIterator.SECOND_EMPTY
+ return True
+ else:
+ if self.key == self.i2.key:
+ self.p1 = self.i1
+ self.p2 = self.i2
+ self.match = CoGroupIterator.NONE_REMAINED
+ elif self.key < self.i2.key:
+ self.p1 = self.i1
+ self.p2 = DummyIterator()
+ self.match = CoGroupIterator.SECOND_REMAINED
+ else:
+ self.p1 = DummyIterator()
+ self.p2 = self.i2
+ self.match = CoGroupIterator.FIRST_REMAINED
+ return True
+
+
+class Iterator(defIter.Iterator):
+ def __init__(self, con, group=0):
+ super(Iterator, self).__init__()
+ self._connection = con
+ self._init = True
+ self._group = group
+ self._deserializer = None
+
+ def __next__(self):
+ return self.next()
+
+ def next(self):
+ if self.has_next():
+ if self._deserializer is None:
+ self._deserializer = _get_deserializer(self._group, self._connection.read)
+ return self._deserializer.deserialize()
+ else:
+ raise StopIteration
+
+ def has_next(self):
+ return self._connection.has_next(self._group)
+
+ def _reset(self):
+ self._deserializer = None
+
+
+class DummyIterator(Iterator):
+ def __init__(self):
+ super(Iterator, self).__init__()
+
+ def __next__(self):
+ raise StopIteration
+
+ def next(self):
+ raise StopIteration
+
+ def has_next(self):
+ return False
+
+
+def _get_deserializer(group, read, type=None):
+ if type is None:
+ type = read(1, group)
+ return _get_deserializer(group, read, type)
+ elif type == Types.TYPE_TUPLE:
+ return TupleDeserializer(read, group)
+ elif type == Types.TYPE_BYTE:
+ return ByteDeserializer(read, group)
+ elif type == Types.TYPE_BYTES:
+ return ByteArrayDeserializer(read, group)
+ elif type == Types.TYPE_BOOLEAN:
+ return BooleanDeserializer(read, group)
+ elif type == Types.TYPE_FLOAT:
+ return FloatDeserializer(read, group)
+ elif type == Types.TYPE_DOUBLE:
+ return DoubleDeserializer(read, group)
+ elif type == Types.TYPE_INTEGER:
+ return IntegerDeserializer(read, group)
+ elif type == Types.TYPE_LONG:
+ return LongDeserializer(read, group)
+ elif type == Types.TYPE_STRING:
+ return StringDeserializer(read, group)
+ elif type == Types.TYPE_NULL:
+ return NullDeserializer(read, group)
+
+
+class TupleDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+ size = unpack(">I", self.read(4, self._group))[0]
+ self.deserializer = [_get_deserializer(self._group, self.read) for _ in range(size)]
+
+ def deserialize(self):
+ return tuple([s.deserialize() for s in self.deserializer])
+
+
+class ByteDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ return unpack(">c", self.read(1, self._group))[0]
+
+
+class ByteArrayDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ size = unpack(">i", self.read(4, self._group))[0]
+ return bytearray(self.read(size, self._group))
+
+
+class BooleanDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ return unpack(">?", self.read(1, self._group))[0]
+
+
+class FloatDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ return unpack(">f", self.read(4, self._group))[0]
+
+
+class DoubleDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ return unpack(">d", self.read(8, self._group))[0]
+
+
+class IntegerDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ return unpack(">i", self.read(4, self._group))[0]
+
+
+class LongDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ return unpack(">q", self.read(8, self._group))[0]
+
+
+class StringDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ length = unpack(">i", self.read(4, self._group))[0]
+ return self.read(length, self._group).decode("utf-8")
+
+
+class NullDeserializer(object):
+ def __init__(self, read, group):
+ self.read = read
+ self._group = group
+
+ def deserialize(self):
+ return None
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
new file mode 100644
index 0000000..7f3cf94
Binary files /dev/null and b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc differ
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TriangleEnumeration.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TriangleEnumeration.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TriangleEnumeration.py
new file mode 100644
index 0000000..2727635
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TriangleEnumeration.py
@@ -0,0 +1,152 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, Order
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.ReduceFunction import ReduceFunction
+from flink.functions.MapFunction import MapFunction
+from flink.functions.JoinFunction import JoinFunction
+
+
+class EdgeDuplicator(FlatMapFunction):
+ def flat_map(self, value, collector):
+ collector.collect((value[0], value[1]))
+ collector.collect((value[1], value[0]))
+
+
+class DegreeCounter(GroupReduceFunction):
+ def reduce(self, iterator, collector):
+ other_vertices = []
+
+ data = iterator.next()
+ edge = (data[0], data[1])
+
+ group_vertex = edge[0]
+ other_vertices.append(edge[1])
+
+ while iterator.has_next():
+ data = iterator.next()
+ edge = [data[0], data[1]]
+ other_vertex = edge[1]
+
+ contained = False
+ for v in other_vertices:
+ if v == other_vertex:
+ contained = True
+ break
+ if not contained and not other_vertex == group_vertex:
+ other_vertices.append(other_vertex)
+
+ degree = len(other_vertices)
+
+ for other_vertex in other_vertices:
+ if group_vertex < other_vertex:
+ output_edge = (group_vertex, degree, other_vertex, 0)
+ else:
+ output_edge = (other_vertex, 0, group_vertex, degree)
+ collector.collect(output_edge)
+
+
+class DegreeJoiner(ReduceFunction):
+ def reduce(self, value1, value2):
+ edge1 = [value1[0], value1[1], value1[2], value1[3]]
+ edge2 = [value2[0], value2[1], value2[2], value2[3]]
+
+ out_edge = [edge1[0], edge1[1], edge1[2], edge1[3]]
+ if edge1[1] == 0 and (not edge1[3] == 0):
+ out_edge[1] = edge2[1]
+ elif (not edge1[1] == 0) and edge1[3] == 0:
+ out_edge[3] = edge2[3]
+ return out_edge
+
+
+class EdgeByDegreeProjector(MapFunction):
+ def map(self, value):
+ if value[1] > value[3]:
+ return (value[2], value[0])
+ else:
+ return (value[0], value[2])
+
+
+class EdgeByIdProjector(MapFunction):
+ def map(self, value):
+ edge = (value[0], value[1])
+ if value[0] > value[1]:
+ return (value[1], value[0])
+ else:
+ return (value[0], value[1])
+
+
+class TriadBuilder(GroupReduceFunction):
+ def reduce(self, iterator, collector):
+ vertices = []
+
+ y = iterator.next()
+ first_edge = [y[0], y[1]]
+
+ vertices.append(first_edge[1])
+
+ while iterator.has_next():
+ x = iterator.next()
+ second_edge = [x[0], x[1]]
+ higher_vertex_id = second_edge[1]
+
+ for lowerVertexId in vertices:
+ collector.collect((first_edge[0], lowerVertexId, higher_vertex_id))
+ vertices.append(higher_vertex_id)
+
+
+class TriadFilter(JoinFunction):
+ def join(self, value1, value2):
+ return value1
+
+
+if __name__ == "__main__":
+ env = get_environment()
+ edges = env.from_elements(
+ (1, 2), (1, 3), (1, 4), (1, 5), (2, 3), (2, 5), (3, 4), (3, 7), (3, 8), (5, 6), (7, 8))
+
+ edges_with_degrees = edges \
+ .flat_map(EdgeDuplicator(), [INT, INT]) \
+ .group_by(0) \
+ .sort_group(1, Order.ASCENDING) \
+ .reduce_group(DegreeCounter(), [INT, INT, INT, INT]) \
+ .group_by(0, 2) \
+ .reduce(DegreeJoiner())
+
+ edges_by_degree = edges_with_degrees \
+ .map(EdgeByDegreeProjector(), [INT, INT])
+
+ edges_by_id = edges_by_degree \
+ .map(EdgeByIdProjector(), [INT, INT])
+
+ triangles = edges_by_degree \
+ .group_by(0) \
+ .sort_group(1, Order.ASCENDING) \
+ .reduce_group(TriadBuilder(), [INT, INT, INT]) \
+ .join(edges_by_id) \
+ .where(1, 2) \
+ .equal_to(0, 1) \
+ .using(TriadFilter(), [INT, INT, INT])
+
+ triangles.output()
+
+ env.set_degree_of_parallelism(1)
+
+ env.execute(local=True)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/WordCount.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/WordCount.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/WordCount.py
new file mode 100644
index 0000000..8a89a6f
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/WordCount.py
@@ -0,0 +1,61 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+
+
+class Tokenizer(FlatMapFunction):
+ def flat_map(self, value, collector):
+ for word in value.lower().split():
+ collector.collect((1, word))
+
+
+class Adder(GroupReduceFunction):
+ def reduce(self, iterator, collector):
+ count, word = iterator.next()
+ count += sum([x[0] for x in iterator])
+ collector.collect((count, word))
+
+
+if __name__ == "__main__":
+ env = get_environment()
+ if len(sys.argv) != 1 and len(sys.argv) != 3:
+ sys.exit("Usage: ./bin/pyflink.sh WordCount[ - <text path> <result path>]")
+
+ if len(sys.argv) == 3:
+ data = env.read_text(sys.argv[1])
+ else:
+ data = env.from_elements("hello","world","hello","car","tree","data","hello")
+
+ result = data \
+ .flat_map(Tokenizer(), (INT, STRING)) \
+ .group_by(1) \
+ .reduce_group(Adder(), (INT, STRING), combinable=True) \
+
+ if len(sys.argv) == 3:
+ result.write_csv(sys.argv[2])
+ else:
+ result.output()
+
+ env.set_degree_of_parallelism(1)
+
+ env.execute(local=True)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
new file mode 100644
index 0000000..c39caf7
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
@@ -0,0 +1,53 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function, RuntimeContext
+from flink.connection import Iterator, Connection, Collector
+
+
+class CoGroupFunction(Function.Function):
+ def __init__(self):
+ super(CoGroupFunction, self).__init__()
+ self._keys1 = None
+ self._keys2 = None
+
+ def _configure(self, input_file, output_file, port):
+ self._connection = Connection.TwinBufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._iterator = Iterator.Iterator(self._connection, 0)
+ self._iterator2 = Iterator.Iterator(self._connection, 1)
+ self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)
+ self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+ self._configure_chain(Collector.Collector(self._connection))
+
+ def _run(self):
+ collector = self._collector
+ iterator = self._cgiter
+ function = self.co_group
+ iterator._init()
+ while iterator.next():
+ result = function(iterator.p1, iterator.p2, collector)
+ if result is not None:
+ for res in result:
+ collector.collect(res)
+ while iterator.p1.has_next():
+ iterator.p1.next()
+ while iterator.p2.has_next():
+ iterator.p2.next()
+ collector._close()
+
+ def co_group(self, iterator1, iterator2, collector):
+ pass
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CrossFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CrossFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CrossFunction.py
new file mode 100644
index 0000000..6657d45
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CrossFunction.py
@@ -0,0 +1,34 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class CrossFunction(Function.Function):
+ def __init__(self):
+ super(CrossFunction, self).__init__()
+
+ def _run(self):
+ collector = self._collector
+ function = self.cross
+ iterator = self._iterator
+ for value in iterator:
+ collector.collect(function(value[0], value[1]))
+ collector._close()
+
+ def cross(self, value1, value2):
+ pass
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FilterFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FilterFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FilterFunction.py
new file mode 100644
index 0000000..fb4fb74
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FilterFunction.py
@@ -0,0 +1,38 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class FilterFunction(Function.Function):
+ def __init__(self):
+ super(FilterFunction, self).__init__()
+
+ def _run(self):
+ collector = self._collector
+ function = self.filter
+ for value in self._iterator:
+ if function(value):
+ collector.collect(value)
+ collector._close()
+
+ def collect(self, value):
+ if self.filter(value):
+ self._collector.collect(value)
+
+ def filter(self, value):
+ pass
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FlatMapFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FlatMapFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FlatMapFunction.py
new file mode 100644
index 0000000..ebc493c
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/FlatMapFunction.py
@@ -0,0 +1,44 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class FlatMapFunction(Function.Function):
+ def __init__(self):
+ super(FlatMapFunction, self).__init__()
+
+ def _run(self):
+ collector = self._collector
+ function = self.flat_map
+ iterator = self._iterator
+ for value in iterator:
+ result = function(value, collector)
+ if result is not None:
+ for res in result:
+ collector.collect(res)
+ collector._close()
+
+ def collect(self, value):
+ collector = self._collector
+ result = self.flat_map(value, collector)
+ if result is not None:
+ for res in result:
+ collector.collect(res)
+
+ def flat_map(self, value, collector):
+ pass
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
new file mode 100644
index 0000000..8e5227e
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
@@ -0,0 +1,92 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from abc import ABCMeta, abstractmethod
+import dill
+import sys
+from collections import deque
+from flink.connection import Connection, Iterator, Collector
+from flink.functions import RuntimeContext
+
+
+class Function(object):
+ __metaclass__ = ABCMeta
+
+ def __init__(self):
+ self._connection = None
+ self._iterator = None
+ self._collector = None
+ self.context = None
+ self._chain_operator = None
+ self._meta = None
+
+ def _configure(self, input_file, output_file, port):
+ self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._iterator = Iterator.Iterator(self._connection)
+ self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+ self._configure_chain(Collector.Collector(self._connection))
+
+ def _configure_chain(self, collector):
+ if self._chain_operator is not None:
+ frag = self._meta.split("|")
+ if "flink/functions" in frag[0]:#lambda function
+ exec("from flink.functions." + frag[1] + " import " + frag[1])
+ else:
+ self._chain_operator = self._chain_operator.replace(b"__main__", b"plan")
+ exec("from plan import " + frag[1])
+ self._collector = dill.loads(self._chain_operator)
+ self._collector.context = self.context
+ self._collector._configure_chain(collector)
+ self._collector._open()
+ else:
+ self._collector = collector
+
+ def _chain(self, operator, meta):
+ self._chain_operator = operator
+ self._meta = meta
+
+ @abstractmethod
+ def _run(self):
+ pass
+
+ def _open(self):
+ pass
+
+ def _close(self):
+ self._collector._close()
+
+ def _go(self):
+ self._receive_broadcast_variables()
+ self._run()
+
+ def _receive_broadcast_variables(self):
+ broadcast_count = self._iterator.next()
+ self._iterator._reset()
+ self._connection.reset()
+ for _ in range(broadcast_count):
+ name = self._iterator.next()
+ self._iterator._reset()
+ self._connection.reset()
+ bc = deque()
+ while(self._iterator.has_next()):
+ bc.append(self._iterator.next())
+ self.context._add_broadcast_variable(name, bc)
+ self._iterator._reset()
+ self._connection.reset()
+
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
new file mode 100644
index 0000000..35359e2
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
@@ -0,0 +1,127 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from collections import defaultdict
+from flink.functions import Function, RuntimeContext
+from flink.connection import Connection, Iterator, Collector
+from flink.plan.Constants import Order
+
+
+class GroupReduceFunction(Function.Function):
+ def __init__(self):
+ super(GroupReduceFunction, self).__init__()
+ self._keys = None
+ self._sort_ops = []
+ self._combine = False
+ self._values = []
+
+ def _configure(self, input_file, output_file, port):
+ if self._combine:
+ self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._iterator = Iterator.Iterator(self._connection)
+ self._collector = Collector.Collector(self._connection)
+ self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+ self._run = self._run_combine
+ else:
+ self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._iterator = Iterator.Iterator(self._connection)
+ self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
+ self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+ self._configure_chain(Collector.Collector(self._connection))
+ self._open()
+
+ def _open(self):
+ if self._keys is None:
+ self._extract_keys = self._extract_keys_id
+
+ def _close(self):
+ self._sort_and_combine()
+ self._collector._close()
+
+ def _set_grouping_keys(self, keys):
+ self._keys = keys
+
+ def _set_sort_ops(self, ops):
+ self._sort_ops = ops
+
+ def _run(self):#reduce
+ connection = self._connection
+ collector = self._collector
+ function = self.reduce
+ iterator = self._group_iterator
+ iterator._init()
+ while iterator.has_group():
+ iterator.next_group()
+ result = function(iterator, collector)
+ if result is not None:
+ for value in result:
+ collector.collect(value)
+ collector._close()
+ connection.send_end_signal()
+
+ def _run_combine(self):#unchained combine
+ connection = self._connection
+ collector = self._collector
+ function = self.combine
+ iterator = self._iterator
+ while 1:
+ result = function(iterator, collector)
+ if result is not None:
+ for value in result:
+ collector.collect(value)
+ connection.send_end_signal()
+ connection.reset()
+
+ def collect(self, value):#chained combine
+ self._values.append(value)
+ if len(self._values) > 1000:
+ self._sort_and_combine()
+
+ def _sort_and_combine(self):
+ values = self._values
+ function = self.combine
+ collector = self._collector
+ extractor = self._extract_keys
+ grouping = defaultdict(list)
+ for value in values:
+ grouping[extractor(value)].append(value)
+ keys = list(grouping.keys())
+ keys.sort()
+ for key in keys:
+ values = grouping[key]
+ for op in self._sort_ops:
+ values.sort(key=lambda x:x[op[0]], reverse = op[1] == Order.DESCENDING)
+ result = function(Iterator.ListIterator(values), collector)
+ if result is not None:
+ for res in result:
+ collector.collect(res)
+ self._values = []
+
+ def _extract_keys(self, x):
+ return tuple([x[k] for k in self._keys])
+
+ def _extract_sort_keys(self, x):
+ return tuple(x[k] for k in self._sort_keys)
+
+ def _extract_keys_id(self, x):
+ return x
+
+ def reduce(self, iterator, collector):
+ pass
+
+ def combine(self, iterator, collector):
+ self.reduce(iterator, collector)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/JoinFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/JoinFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/JoinFunction.py
new file mode 100644
index 0000000..0c1274b
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/JoinFunction.py
@@ -0,0 +1,33 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class JoinFunction(Function.Function):
+ def __init__(self):
+ super(JoinFunction, self).__init__()
+
+ def _run(self):
+ collector = self._collector
+ function = self.join
+ for value in self._iterator:
+ collector.collect(function(value[0], value[1]))
+ collector._close()
+
+ def join(self, value1, value2):
+ pass
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapFunction.py
new file mode 100644
index 0000000..882cfee
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapFunction.py
@@ -0,0 +1,36 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class MapFunction(Function.Function):
+ def __init__(self):
+ super(MapFunction, self).__init__()
+
+ def _run(self):
+ collector = self._collector
+ function = self.map
+ for value in self._iterator:
+ collector.collect(function(value))
+ collector._close()
+
+ def collect(self, value):
+ self._collector.collect(self.map(value))
+
+ def map(self, value):
+ pass
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapPartitionFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapPartitionFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapPartitionFunction.py
new file mode 100644
index 0000000..38bff98
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/MapPartitionFunction.py
@@ -0,0 +1,34 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class MapPartitionFunction(Function.Function):
+ def __init__(self):
+ super(MapPartitionFunction, self).__init__()
+
+ def _run(self):
+ collector = self._collector
+ result = self.map_partition(self._iterator, collector)
+ if result is not None:
+ for res in result:
+ collector.collect(res)
+ collector._close()
+
+ def map_partition(self, iterator, collector):
+ pass
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
new file mode 100644
index 0000000..7da1ef4
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
@@ -0,0 +1,123 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from collections import defaultdict
+from flink.functions import Function, RuntimeContext
+from flink.connection import Connection, Iterator, Collector
+
+
+class ReduceFunction(Function.Function):
+ def __init__(self):
+ super(ReduceFunction, self).__init__()
+ self._keys = None
+ self._combine = False
+ self._values = []
+
+ def _configure(self, input_file, output_file, port):
+ if self._combine:
+ self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._iterator = Iterator.Iterator(self._connection)
+ self._collector = Collector.Collector(self._connection)
+ self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+ self._run = self._run_combine
+ else:
+ self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._iterator = Iterator.Iterator(self._connection)
+ if self._keys is None:
+ self._run = self._run_allreduce
+ else:
+ self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
+ self._configure_chain(Collector.Collector(self._connection))
+ self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+
+ def _set_grouping_keys(self, keys):
+ self._keys = keys
+
+ def _close(self):
+ self._sort_and_combine()
+ self._collector._close()
+
+ def _run(self):#grouped reduce
+ collector = self._collector
+ function = self.reduce
+ iterator = self._group_iterator
+ iterator._init()
+ while iterator.has_group():
+ iterator.next_group()
+ if iterator.has_next():
+ base = iterator.next()
+ for value in iterator:
+ base = function(base, value)
+ collector.collect(base)
+ collector._close()
+
+ def _run_allreduce(self):#ungrouped reduce
+ collector = self._collector
+ function = self.reduce
+ iterator = self._iterator
+ if iterator.has_next():
+ base = iterator.next()
+ for value in iterator:
+ base = function(base, value)
+ collector.collect(base)
+ collector._close()
+
+ def _run_combine(self):#unchained combine
+ connection = self._connection
+ collector = self._collector
+ function = self.combine
+ iterator = self._iterator
+ while 1:
+ if iterator.has_next():
+ base = iterator.next()
+ while iterator.has_next():
+ base = function(base, iterator.next())
+ collector.collect(base)
+ connection.send_end_signal()
+ connection.reset()
+
+ def collect(self, value):#chained combine
+ self._values.append(value)
+ if len(self._values) > 1000:
+ self._sort_and_combine()
+
+ def _sort_and_combine(self):
+ values = self._values
+ function = self.combine
+ collector = self._collector
+ extractor = self._extract_keys
+ grouping = defaultdict(list)
+ for value in values:
+ grouping[extractor(value)].append(value)
+ keys = list(grouping.keys())
+ keys.sort()
+ for key in keys:
+ iterator = Iterator.ListIterator(grouping[key])
+ base = iterator.next()
+ while iterator.has_next():
+ base = function(base, iterator.next())
+ collector.collect(base)
+ self._values = []
+
+ def _extract_keys(self, x):
+ return tuple([x[k] for k in self._keys])
+
+ def reduce(self, value1, value2):
+ pass
+
+ def combine(self, value1, value2):
+ return self.reduce(value1, value2)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/RuntimeContext.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/RuntimeContext.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/RuntimeContext.py
new file mode 100644
index 0000000..2977eb5
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/RuntimeContext.py
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class RuntimeContext(object):
+ def __init__(self, iterator, collector):
+ self.iterator = iterator
+ self.collector = collector
+ self.broadcast_variables = dict()
+
+ def _add_broadcast_variable(self, name, var):
+ self.broadcast_variables[name] = var
+
+ def get_broadcast_variable(self, name):
+ return self.broadcast_variables[name]
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Constants.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Constants.py
new file mode 100644
index 0000000..f60273f
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Constants.py
@@ -0,0 +1,106 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class _Identifier(object):
+ """
+ Gotta be kept in sync with java constants!
+ """
+ SORT = "sort"
+ GROUP = "groupby"
+ COGROUP = "cogroup"
+ CROSS = "cross"
+ CROSSH = "cross_h"
+ CROSST = "cross_t"
+ FLATMAP = "flatmap"
+ FILTER = "filter"
+ MAPPARTITION = "mappartition"
+ GROUPREDUCE = "groupreduce"
+ JOIN = "join"
+ JOINH = "join_h"
+ JOINT = "join_t"
+ MAP = "map"
+ PROJECTION = "projection"
+ REDUCE = "reduce"
+ UNION = "union"
+ SOURCE_CSV = "source_csv"
+ SOURCE_TEXT = "source_text"
+ SOURCE_VALUE = "source_value"
+ SINK_CSV = "sink_csv"
+ SINK_TEXT = "sink_text"
+ SINK_PRINT = "sink_print"
+ BROADCAST = "broadcast"
+
+
+class _Fields(object):
+ PARENT = "parent"
+ OTHER = "other_set"
+ SINKS = "sinks"
+ IDENTIFIER = "identifier"
+ FIELD = "field"
+ ORDER = "order"
+ KEYS = "keys"
+ KEY1 = "key1"
+ KEY2 = "key2"
+ TYPES = "types"
+ OPERATOR = "operator"
+ META = "meta"
+ NAME = "name"
+ COMBINE = "combine"
+ DELIMITER_LINE = "del_l"
+ DELIMITER_FIELD = "del_f"
+ WRITE_MODE = "write"
+ PATH = "path"
+ VALUES = "values"
+ COMBINEOP = "combineop"
+ CHILDREN = "children"
+ BCVARS = "bcvars"
+ PROJECTIONS = "projections"
+ ID = "id"
+ TO_ERR = "to_error"
+
+
+class WriteMode(object):
+ NO_OVERWRITE = 0
+ OVERWRITE = 1
+
+
+class Order(object):
+ NONE = 0
+ ASCENDING = 1
+ DESCENDING = 2
+ ANY = 3
+
+import sys
+
+PY2 = sys.version_info[0] == 2
+PY3 = sys.version_info[0] == 3
+
+if PY2:
+ BOOL = True
+ INT = 1
+ LONG = long(1)
+ FLOAT = 2.5
+ STRING = "type"
+ BYTES = bytearray(b"byte")
+elif PY3:
+ BOOL = True
+ INT = 1
+ FLOAT = 2.5
+ STRING = "type"
+ BYTES = bytearray(b"byte")