You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/12 21:09:21 UTC
[6/8] flink git commit: [FLINK-2901] Move PythonAPI to flink-libraries
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapFunction.py
new file mode 100644
index 0000000..882cfee
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapFunction.py
@@ -0,0 +1,36 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class MapFunction(Function.Function):
+ def __init__(self):
+ super(MapFunction, self).__init__()
+
+ def _run(self):
+ collector = self._collector
+ function = self.map
+ for value in self._iterator:
+ collector.collect(function(value))
+ collector._close()
+
+ def collect(self, value):
+ self._collector.collect(self.map(value))
+
+ def map(self, value):
+ pass
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapPartitionFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapPartitionFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapPartitionFunction.py
new file mode 100644
index 0000000..38bff98
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapPartitionFunction.py
@@ -0,0 +1,34 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class MapPartitionFunction(Function.Function):
+ def __init__(self):
+ super(MapPartitionFunction, self).__init__()
+
+ def _run(self):
+ collector = self._collector
+ result = self.map_partition(self._iterator, collector)
+ if result is not None:
+ for res in result:
+ collector.collect(res)
+ collector._close()
+
+ def map_partition(self, iterator, collector):
+ pass
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
new file mode 100644
index 0000000..ffa6de0
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -0,0 +1,123 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from collections import defaultdict
+from flink.functions import Function, RuntimeContext
+from flink.connection import Connection, Iterator, Collector
+
+
+class ReduceFunction(Function.Function):
+ def __init__(self):
+ super(ReduceFunction, self).__init__()
+ self._keys = None
+ self._combine = False
+ self._values = []
+
+ def _configure(self, input_file, output_file, port):
+ if self._combine:
+ self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
+ self._iterator = Iterator.Iterator(self._connection)
+ self._collector = Collector.Collector(self._connection)
+ self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+ self._run = self._run_combine
+ else:
+ self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
+ self._iterator = Iterator.Iterator(self._connection)
+ if self._keys is None:
+ self._run = self._run_allreduce
+ else:
+ self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
+ self._configure_chain(Collector.Collector(self._connection))
+ self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+
+ def _set_grouping_keys(self, keys):
+ self._keys = keys
+
+ def _close(self):
+ self._sort_and_combine()
+ self._collector._close()
+
+ def _run(self):#grouped reduce
+ collector = self._collector
+ function = self.reduce
+ iterator = self._group_iterator
+ iterator._init()
+ while iterator.has_group():
+ iterator.next_group()
+ if iterator.has_next():
+ base = iterator.next()
+ for value in iterator:
+ base = function(base, value)
+ collector.collect(base)
+ collector._close()
+
+ def _run_allreduce(self):#ungrouped reduce
+ collector = self._collector
+ function = self.reduce
+ iterator = self._iterator
+ if iterator.has_next():
+ base = iterator.next()
+ for value in iterator:
+ base = function(base, value)
+ collector.collect(base)
+ collector._close()
+
+ def _run_combine(self):#unchained combine
+ connection = self._connection
+ collector = self._collector
+ function = self.combine
+ iterator = self._iterator
+ while 1:
+ if iterator.has_next():
+ base = iterator.next()
+ while iterator.has_next():
+ base = function(base, iterator.next())
+ collector.collect(base)
+ connection.send_end_signal()
+ connection.reset()
+
+ def collect(self, value):#chained combine
+ self._values.append(value)
+ if len(self._values) > 1000:
+ self._sort_and_combine()
+
+ def _sort_and_combine(self):
+ values = self._values
+ function = self.combine
+ collector = self._collector
+ extractor = self._extract_keys
+ grouping = defaultdict(list)
+ for value in values:
+ grouping[extractor(value)].append(value)
+ keys = list(grouping.keys())
+ keys.sort()
+ for key in keys:
+ iterator = Iterator.ListIterator(grouping[key])
+ base = iterator.next()
+ while iterator.has_next():
+ base = function(base, iterator.next())
+ collector.collect(base)
+ self._values = []
+
+ def _extract_keys(self, x):
+ return tuple([x[k] for k in self._keys])
+
+ def reduce(self, value1, value2):
+ pass
+
+ def combine(self, value1, value2):
+ return self.reduce(value1, value2)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
new file mode 100644
index 0000000..2977eb5
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class RuntimeContext(object):
+ def __init__(self, iterator, collector):
+ self.iterator = iterator
+ self.collector = collector
+ self.broadcast_variables = dict()
+
+ def _add_broadcast_variable(self, name, var):
+ self.broadcast_variables[name] = var
+
+ def get_broadcast_variable(self, name):
+ return self.broadcast_variables[name]
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/__init__.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/__init__.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
new file mode 100644
index 0000000..f60273f
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -0,0 +1,106 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class _Identifier(object):
+ """
+ Gotta be kept in sync with java constants!
+ """
+ SORT = "sort"
+ GROUP = "groupby"
+ COGROUP = "cogroup"
+ CROSS = "cross"
+ CROSSH = "cross_h"
+ CROSST = "cross_t"
+ FLATMAP = "flatmap"
+ FILTER = "filter"
+ MAPPARTITION = "mappartition"
+ GROUPREDUCE = "groupreduce"
+ JOIN = "join"
+ JOINH = "join_h"
+ JOINT = "join_t"
+ MAP = "map"
+ PROJECTION = "projection"
+ REDUCE = "reduce"
+ UNION = "union"
+ SOURCE_CSV = "source_csv"
+ SOURCE_TEXT = "source_text"
+ SOURCE_VALUE = "source_value"
+ SINK_CSV = "sink_csv"
+ SINK_TEXT = "sink_text"
+ SINK_PRINT = "sink_print"
+ BROADCAST = "broadcast"
+
+
+class _Fields(object):
+ PARENT = "parent"
+ OTHER = "other_set"
+ SINKS = "sinks"
+ IDENTIFIER = "identifier"
+ FIELD = "field"
+ ORDER = "order"
+ KEYS = "keys"
+ KEY1 = "key1"
+ KEY2 = "key2"
+ TYPES = "types"
+ OPERATOR = "operator"
+ META = "meta"
+ NAME = "name"
+ COMBINE = "combine"
+ DELIMITER_LINE = "del_l"
+ DELIMITER_FIELD = "del_f"
+ WRITE_MODE = "write"
+ PATH = "path"
+ VALUES = "values"
+ COMBINEOP = "combineop"
+ CHILDREN = "children"
+ BCVARS = "bcvars"
+ PROJECTIONS = "projections"
+ ID = "id"
+ TO_ERR = "to_error"
+
+
+class WriteMode(object):
+ NO_OVERWRITE = 0
+ OVERWRITE = 1
+
+
+class Order(object):
+ NONE = 0
+ ASCENDING = 1
+ DESCENDING = 2
+ ANY = 3
+
+import sys
+
+PY2 = sys.version_info[0] == 2
+PY3 = sys.version_info[0] == 3
+
+if PY2:
+ BOOL = True
+ INT = 1
+ LONG = long(1)
+ FLOAT = 2.5
+ STRING = "type"
+ BYTES = bytearray(b"byte")
+elif PY3:
+ BOOL = True
+ INT = 1
+ FLOAT = 2.5
+ STRING = "type"
+ BYTES = bytearray(b"byte")
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
new file mode 100644
index 0000000..390a08d
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -0,0 +1,906 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import inspect
+import copy
+import types as TYPES
+
+from flink.plan.Constants import _Fields, _Identifier, WriteMode, STRING
+from flink.functions.CoGroupFunction import CoGroupFunction
+from flink.functions.FilterFunction import FilterFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.CrossFunction import CrossFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.JoinFunction import JoinFunction
+from flink.functions.MapFunction import MapFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.functions.ReduceFunction import ReduceFunction
+
+def deduct_output_type(dataset):
+ skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION])
+ source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE])
+ default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN])
+
+ while True:
+ dataset_type = dataset[_Fields.IDENTIFIER]
+ if dataset_type in skip:
+ dataset = dataset[_Fields.PARENT]
+ continue
+ if dataset_type in source:
+ if dataset_type == _Identifier.SOURCE_TEXT:
+ return STRING
+ if dataset_type == _Identifier.SOURCE_VALUE:
+ return dataset[_Fields.VALUES][0]
+ if dataset_type == _Identifier.SOURCE_CSV:
+ return dataset[_Fields.TYPES]
+ if dataset_type == _Identifier.PROJECTION:
+ return tuple([deduct_output_type(dataset[_Fields.PARENT])[k] for k in dataset[_Fields.KEYS]])
+ if dataset_type in default:
+ if dataset[_Fields.OPERATOR] is not None: #udf-join/cross
+ return dataset[_Fields.TYPES]
+ if len(dataset[_Fields.PROJECTIONS]) == 0: #defaultjoin/-cross
+ return (deduct_output_type(dataset[_Fields.PARENT]), deduct_output_type(dataset[_Fields.OTHER]))
+ else: #projectjoin/-cross
+ t1 = deduct_output_type(dataset[_Fields.PARENT])
+ t2 = deduct_output_type(dataset[_Fields.OTHER])
+ out_type = []
+ for prj in dataset[_Fields.PROJECTIONS]:
+ if len(prj[1]) == 0: #projection on non-tuple dataset
+ if prj[0] == "first":
+ out_type.append(t1)
+ else:
+ out_type.append(t2)
+ else: #projection on tuple dataset
+ for key in prj[1]:
+ if prj[0] == "first":
+ out_type.append(t1[key])
+ else:
+ out_type.append(t2[key])
+ return tuple(out_type)
+ return dataset[_Fields.TYPES]
+
+
+class Set(object):
+ def __init__(self, env, info, copy_set=False):
+ self._env = env
+ self._info = info
+ if not copy_set:
+ self._info[_Fields.ID] = env._counter
+ self._info[_Fields.BCVARS] = []
+ self._info[_Fields.CHILDREN] = []
+ self._info[_Fields.SINKS] = []
+ self._info[_Fields.NAME] = None
+ env._counter += 1
+
+ def output(self, to_error=False):
+ """
+ Writes a DataSet to the standard output stream (stdout).
+ """
+ child = dict()
+ child[_Fields.IDENTIFIER] = _Identifier.SINK_PRINT
+ child[_Fields.PARENT] = self._info
+ child[_Fields.TO_ERR] = to_error
+ self._info[_Fields.SINKS].append(child)
+ self._env._sinks.append(child)
+
+ def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
+ """
+ Writes a DataSet as a text file to the specified location.
+
+ :param path: he path pointing to the location the text file is written to.
+ :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
+ """
+ child = dict()
+ child[_Fields.IDENTIFIER] = _Identifier.SINK_TEXT
+ child[_Fields.PARENT] = self._info
+ child[_Fields.PATH] = path
+ child[_Fields.WRITE_MODE] = write_mode
+ self._info[_Fields.SINKS].append(child)
+ self._env._sinks.append(child)
+
+ def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
+ """
+ Writes a Tuple DataSet as a CSV file to the specified location.
+
+ Note: Only a Tuple DataSet can written as a CSV file.
+ :param path: The path pointing to the location the CSV file is written to.
+ :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
+ """
+ child = dict()
+ child[_Fields.IDENTIFIER] = _Identifier.SINK_CSV
+ child[_Fields.PATH] = path
+ child[_Fields.PARENT] = self._info
+ child[_Fields.DELIMITER_FIELD] = field_delimiter
+ child[_Fields.DELIMITER_LINE] = line_delimiter
+ child[_Fields.WRITE_MODE] = write_mode
+ self._info[_Fields.SINKS].append(child)
+ self._env._sinks.append(child)
+
+ def reduce_group(self, operator, types, combinable=False):
+ """
+ Applies a GroupReduce transformation.
+
+ The transformation calls a GroupReduceFunction once for each group of the DataSet, or one when applied on a
+ non-grouped DataSet.
+ The GroupReduceFunction can iterate over all elements of the DataSet and
+ emit any number of output elements including none.
+
+ :param operator: The GroupReduceFunction that is applied on the DataSet.
+ :param types: The type of the resulting DataSet.
+ :return:A GroupReduceOperator that represents the reduced DataSet.
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = GroupReduceFunction()
+ operator.reduce = f
+ child = dict()
+ child_set = OperatorSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OPERATOR] = copy.deepcopy(operator)
+ child[_Fields.OPERATOR]._combine = False
+ child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child[_Fields.TYPES] = types
+ child[_Fields.COMBINE] = combinable
+ child[_Fields.COMBINEOP] = operator
+ child[_Fields.COMBINEOP]._combine = True
+ child[_Fields.NAME] = "PythonGroupReduce"
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+
+class ReduceSet(Set):
+ def __init__(self, env, info, copy_set=False):
+ super(ReduceSet, self).__init__(env, info, copy_set)
+ if not copy_set:
+ self._is_chained = False
+
+ def reduce(self, operator):
+ """
+ Applies a Reduce transformation on a non-grouped DataSet.
+
+ The transformation consecutively calls a ReduceFunction until only a single element remains which is the result
+ of the transformation. A ReduceFunction combines two elements into one new element of the same type.
+
+ :param operator:The ReduceFunction that is applied on the DataSet.
+ :return:A ReduceOperator that represents the reduced DataSet.
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = ReduceFunction()
+ operator.reduce = f
+ child = dict()
+ child_set = OperatorSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.REDUCE
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OPERATOR] = operator
+ child[_Fields.COMBINEOP] = operator
+ child[_Fields.COMBINE] = False
+ child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child[_Fields.NAME] = "PythonReduce"
+ child[_Fields.TYPES] = deduct_output_type(self._info)
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+
+class DataSet(ReduceSet):
+ def __init__(self, env, info, copy_set=False):
+ super(DataSet, self).__init__(env, info, copy_set)
+
+ def project(self, *fields):
+ """
+ Applies a Project transformation on a Tuple DataSet.
+
+ Note: Only Tuple DataSets can be projected. The transformation projects each Tuple of the DataSet onto a
+ (sub)set of fields.
+
+ :param fields: The field indexes of the input tuples that are retained.
+ The order of fields in the output tuple corresponds to the order of field indexes.
+ :return: The projected DataSet.
+
+ """
+ child = dict()
+ child_set = DataSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.PROJECTION
+ child[_Fields.PARENT] = self._info
+ child[_Fields.KEYS] = fields
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+ def group_by(self, *keys):
+ """
+ Groups a Tuple DataSet using field position keys.
+ Note: Field position keys only be specified for Tuple DataSets.
+ The field position keys specify the fields of Tuples on which the DataSet is grouped.
+ This method returns an UnsortedGrouping on which one of the following grouping transformation can be applied.
+ sort_group() to get a SortedGrouping.
+ reduce() to apply a Reduce transformation.
+ group_reduce() to apply a GroupReduce transformation.
+
+ :param keys: One or more field positions on which the DataSet will be grouped.
+ :return:A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
+ """
+ child = dict()
+ child_chain = []
+ child_set = UnsortedGrouping(self._env, child, child_chain)
+ child[_Fields.IDENTIFIER] = _Identifier.GROUP
+ child[_Fields.PARENT] = self._info
+ child[_Fields.KEYS] = keys
+ child_chain.append(child)
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+ def co_group(self, other_set):
+ """
+ Initiates a CoGroup transformation which combines the elements of two DataSets into on DataSet.
+
+ It groups each DataSet individually on a key and gives groups of both DataSets with equal keys together into a
+ CoGroupFunction. If a DataSet has a group with no matching key in the other DataSet,
+ the CoGroupFunction is called with an empty group for the non-existing group.
+ The CoGroupFunction can iterate over the elements of both groups and return any number of elements
+ including none.
+
+ :param other_set: The other DataSet of the CoGroup transformation.
+ :return:A CoGroupOperator to continue the definition of the CoGroup transformation.
+ """
+ child = dict()
+ other_set._info[_Fields.CHILDREN].append(child)
+ child_set = CoGroupOperatorWhere(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.COGROUP
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OTHER] = other_set._info
+ self._info[_Fields.CHILDREN].append(child)
+ return child_set
+
+ def cross(self, other_set):
+ """
+ Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
+
+ It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
+
+ :param other_set: The other DataSet with which this DataSet is crossed.
+ :return:A CrossOperator to continue the definition of the Cross transformation.
+ """
+ return self._cross(other_set, _Identifier.CROSS)
+
+ def cross_with_huge(self, other_set):
+ """
+ Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
+
+ It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
+ This method also gives the hint to the optimizer that
+ the second DataSet to cross is much larger than the first one.
+
+ :param other_set: The other DataSet with which this DataSet is crossed.
+ :return:A CrossOperator to continue the definition of the Cross transformation.
+ """
+ return self._cross(other_set, _Identifier.CROSSH)
+
+ def cross_with_tiny(self, other_set):
+ """
+ Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
+
+ It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
+ This method also gives the hint to the optimizer that
+ the second DataSet to cross is much smaller than the first one.
+
+ :param other_set: The other DataSet with which this DataSet is crossed.
+ :return:A CrossOperator to continue the definition of the Cross transformation.
+ """
+ return self._cross(other_set, _Identifier.CROSST)
+
+ def _cross(self, other_set, identifier):
+ child = dict()
+ child_set = CrossOperator(self._env, child)
+ child[_Fields.IDENTIFIER] = identifier
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OTHER] = other_set._info
+ child[_Fields.PROJECTIONS] = []
+ child[_Fields.OPERATOR] = None
+ child[_Fields.META] = None
+ self._info[_Fields.CHILDREN].append(child)
+ other_set._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+ def filter(self, operator):
+ """
+ Applies a Filter transformation on a DataSet.
+
+ he transformation calls a FilterFunction for each element of the DataSet and retains only those element
+ for which the function returns true. Elements for which the function returns false are filtered.
+
+ :param operator: The FilterFunction that is called for each element of the DataSet.
+ :return:A FilterOperator that represents the filtered DataSet.
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = FilterFunction()
+ operator.filter = f
+ child = dict()
+ child_set = OperatorSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.FILTER
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OPERATOR] = operator
+ child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child[_Fields.NAME] = "PythonFilter"
+ child[_Fields.TYPES] = deduct_output_type(self._info)
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+ def flat_map(self, operator, types):
+ """
+ Applies a FlatMap transformation on a DataSet.
+
+ The transformation calls a FlatMapFunction for each element of the DataSet.
+ Each FlatMapFunction call can return any number of elements including none.
+
+ :param operator: The FlatMapFunction that is called for each element of the DataSet.
+ :param types: The type of the resulting DataSet.
+ :return:A FlatMapOperator that represents the transformed DataSe
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = FlatMapFunction()
+ operator.flat_map = f
+ child = dict()
+ child_set = OperatorSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.FLATMAP
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OPERATOR] = operator
+ child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child[_Fields.TYPES] = types
+ child[_Fields.NAME] = "PythonFlatMap"
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+ def join(self, other_set):
+ """
+ Initiates a Join transformation.
+
+ A Join transformation joins the elements of two DataSets on key equality.
+
+ :param other_set: The other DataSet with which this DataSet is joined
+ :return:A JoinOperator to continue the definition of the Join transformation.
+ """
+ return self._join(other_set, _Identifier.JOIN)
+
+ def join_with_huge(self, other_set):
+ """
+ Initiates a Join transformation.
+
+ A Join transformation joins the elements of two DataSets on key equality.
+ This method also gives the hint to the optimizer that
+ the second DataSet to join is much larger than the first one.
+
+ :param other_set: The other DataSet with which this DataSet is joined
+ :return:A JoinOperator to continue the definition of the Join transformation.
+ """
+ return self._join(other_set, _Identifier.JOINH)
+
+ def join_with_tiny(self, other_set):
+ """
+ Initiates a Join transformation.
+
+ A Join transformation joins the elements of two DataSets on key equality.
+ This method also gives the hint to the optimizer that
+ the second DataSet to join is much smaller than the first one.
+
+ :param other_set: The other DataSet with which this DataSet is joined
+ :return:A JoinOperator to continue the definition of the Join transformation.
+ """
+ return self._join(other_set, _Identifier.JOINT)
+
+ def _join(self, other_set, identifier):
+ child = dict()
+ child_set = JoinOperatorWhere(self._env, child)
+ child[_Fields.IDENTIFIER] = identifier
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OTHER] = other_set._info
+ child[_Fields.OPERATOR] = None
+ child[_Fields.META] = None
+ child[_Fields.PROJECTIONS] = []
+ self._info[_Fields.CHILDREN].append(child)
+ other_set._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+ def map(self, operator, types):
+ """
+ Applies a Map transformation on a DataSet.
+
+ The transformation calls a MapFunction for each element of the DataSet.
+ Each MapFunction call returns exactly one element.
+
+ :param operator: The MapFunction that is called for each element of the DataSet.
+ :param types: The type of the resulting DataSet
+ :return:A MapOperator that represents the transformed DataSet
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = MapFunction()
+ operator.map = f
+ child = dict()
+ child_set = OperatorSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.MAP
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OPERATOR] = operator
+ child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child[_Fields.TYPES] = types
+ child[_Fields.NAME] = "PythonMap"
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+ def map_partition(self, operator, types):
+ """
+ Applies a MapPartition transformation on a DataSet.
+
+ The transformation calls a MapPartitionFunction once per parallel partition of the DataSet.
+ The entire partition is available through the given Iterator.
+ Each MapPartitionFunction may return an arbitrary number of results.
+
+ The number of elements that each instance of the MapPartition function
+ sees is non deterministic and depends on the degree of parallelism of the operation.
+
+ :param operator: The MapFunction that is called for each element of the DataSet.
+ :param types: The type of the resulting DataSet
+ :return:A MapOperator that represents the transformed DataSet
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = MapPartitionFunction()
+ operator.map_partition = f
+ child = dict()
+ child_set = OperatorSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.MAPPARTITION
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OPERATOR] = operator
+ child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child[_Fields.TYPES] = types
+ child[_Fields.NAME] = "PythonMapPartition"
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+ def union(self, other_set):
+ """
+ Creates a union of this DataSet with an other DataSet.
+
+ The other DataSet must be of the same data type.
+
+ :param other_set: The other DataSet which is unioned with the current DataSet.
+ :return:The resulting DataSet.
+ """
+ child = dict()
+ child_set = DataSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.UNION
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OTHER] = other_set._info
+ self._info[_Fields.CHILDREN].append(child)
+ other_set._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+ return child_set
+
+
+class OperatorSet(DataSet):
+ def __init__(self, env, info, copy_set=False):
+ super(OperatorSet, self).__init__(env, info, copy_set)
+
+ def with_broadcast_set(self, name, set):
+ child = dict()
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OTHER] = set._info
+ child[_Fields.NAME] = name
+ self._info[_Fields.BCVARS].append(child)
+ set._info[_Fields.CHILDREN].append(child)
+ self._env._broadcast.append(child)
+ return self
+
+
+class Grouping(object):
+ def __init__(self, env, info, child_chain):
+ self._env = env
+ self._child_chain = child_chain
+ self._info = info
+ info[_Fields.ID] = env._counter
+ info[_Fields.CHILDREN] = []
+ info[_Fields.SINKS] = []
+ env._counter += 1
+
+ def reduce_group(self, operator, types, combinable=False):
+ """
+ Applies a GroupReduce transformation.
+
+ The transformation calls a GroupReduceFunction once for each group of the DataSet, or one when applied on a
+ non-grouped DataSet.
+ The GroupReduceFunction can iterate over all elements of the DataSet and
+ emit any number of output elements including none.
+
+ :param operator: The GroupReduceFunction that is applied on the DataSet.
+ :param types: The type of the resulting DataSet.
+ :return:A GroupReduceOperator that represents the reduced DataSet.
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = GroupReduceFunction()
+ operator.reduce = f
+ operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
+ operator._set_sort_ops([(x[_Fields.FIELD], x[_Fields.ORDER]) for x in self._child_chain[1:]])
+ child = dict()
+ child_set = OperatorSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OPERATOR] = copy.deepcopy(operator)
+ child[_Fields.OPERATOR]._combine = False
+ child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child[_Fields.TYPES] = types
+ child[_Fields.COMBINE] = combinable
+ child[_Fields.COMBINEOP] = operator
+ child[_Fields.COMBINEOP]._combine = True
+ child[_Fields.NAME] = "PythonGroupReduce"
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+
+ return child_set
+
+ def sort_group(self, field, order):
+ """
+ Sorts Tuple elements within a group on the specified field in the specified Order.
+
+ Note: Only groups of Tuple elements can be sorted.
+ Groups can be sorted by multiple fields by chaining sort_group() calls.
+
+ :param field:The Tuple field on which the group is sorted.
+ :param order: The Order in which the specified Tuple field is sorted. See DataSet.Order.
+ :return:A SortedGrouping with specified order of group element.
+ """
+ child = dict()
+ child_set = SortedGrouping(self._env, child, self._child_chain)
+ child[_Fields.IDENTIFIER] = _Identifier.SORT
+ child[_Fields.PARENT] = self._info
+ child[_Fields.FIELD] = field
+ child[_Fields.ORDER] = order
+ self._info[_Fields.CHILDREN].append(child)
+ self._child_chain.append(child)
+ self._env._sets.append(child)
+ return child_set
+
+
+class UnsortedGrouping(Grouping):
+ def __init__(self, env, info, child_chain):
+ super(UnsortedGrouping, self).__init__(env, info, child_chain)
+
+ def reduce(self, operator):
+ """
+ Applies a Reduce transformation on a non-grouped DataSet.
+
+ The transformation consecutively calls a ReduceFunction until only a single element remains which is the result
+ of the transformation. A ReduceFunction combines two elements into one new element of the same type.
+
+ :param operator:The ReduceFunction that is applied on the DataSet.
+ :return:A ReduceOperator that represents the reduced DataSet.
+ """
+ operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
+ for i in self._child_chain:
+ self._env._sets.append(i)
+ child = dict()
+ child_set = OperatorSet(self._env, child)
+ child[_Fields.IDENTIFIER] = _Identifier.REDUCE
+ child[_Fields.PARENT] = self._info
+ child[_Fields.OPERATOR] = copy.deepcopy(operator)
+ child[_Fields.OPERATOR]._combine = False
+ child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child[_Fields.COMBINE] = True
+ child[_Fields.COMBINEOP] = operator
+ child[_Fields.COMBINEOP]._combine = True
+ child[_Fields.NAME] = "PythonReduce"
+ child[_Fields.TYPES] = deduct_output_type(self._info)
+ self._info[_Fields.CHILDREN].append(child)
+ self._env._sets.append(child)
+
+ return child_set
+
+
+class SortedGrouping(Grouping):
+ def __init__(self, env, info, child_chain):
+ super(SortedGrouping, self).__init__(env, info, child_chain)
+
+
+class CoGroupOperatorWhere(object):
+ def __init__(self, env, info):
+ self._env = env
+ self._info = info
+
+ def where(self, *fields):
+ """
+ Continues a CoGroup transformation.
+
+ Defines the Tuple fields of the first co-grouped DataSet that should be used as grouping keys.
+ Note: Fields can only be selected as grouping keys on Tuple DataSets.
+
+ :param fields: The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys.
+ :return: An incomplete CoGroup transformation.
+ """
+ self._info[_Fields.KEY1] = fields
+ return CoGroupOperatorTo(self._env, self._info)
+
+
+class CoGroupOperatorTo(object):
+ def __init__(self, env, info):
+ self._env = env
+ self._info = info
+
+ def equal_to(self, *fields):
+ """
+ Continues a CoGroup transformation.
+
+ Defines the Tuple fields of the second co-grouped DataSet that should be used as grouping keys.
+ Note: Fields can only be selected as grouping keys on Tuple DataSets.
+
+ :param fields: The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys.
+ :return: An incomplete CoGroup transformation.
+ """
+ self._info[_Fields.KEY2] = fields
+ return CoGroupOperatorUsing(self._env, self._info)
+
+
+class CoGroupOperatorUsing(object):
+ def __init__(self, env, info):
+ self._env = env
+ self._info = info
+
+ def using(self, operator, types):
+ """
+ Finalizes a CoGroup transformation.
+
+ Applies a CoGroupFunction to groups of elements with identical keys.
+ Each CoGroupFunction call returns an arbitrary number of keys.
+
+ :param operator: The CoGroupFunction that is called for all groups of elements with identical keys.
+ :param types: The type of the resulting DataSet.
+ :return:An CoGroupOperator that represents the co-grouped result DataSet.
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = CoGroupFunction()
+ operator.co_group = f
+ new_set = OperatorSet(self._env, self._info)
+ operator._keys1 = self._info[_Fields.KEY1]
+ operator._keys2 = self._info[_Fields.KEY2]
+ self._info[_Fields.OPERATOR] = operator
+ self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ self._info[_Fields.TYPES] = types
+ self._info[_Fields.NAME] = "PythonCoGroup"
+ self._env._sets.append(self._info)
+ return new_set
+
+
+class JoinOperatorWhere(object):
+ def __init__(self, env, info):
+ self._env = env
+ self._info = info
+
+ def where(self, *fields):
+ """
+ Continues a Join transformation.
+
+ Defines the Tuple fields of the first join DataSet that should be used as join keys.
+ Note: Fields can only be selected as join keys on Tuple DataSets.
+
+ :param fields: The indexes of the Tuple fields of the first join DataSets that should be used as keys.
+ :return:An incomplete Join transformation.
+
+ """
+ self._info[_Fields.KEY1] = fields
+ return JoinOperatorTo(self._env, self._info)
+
+
+class JoinOperatorTo(object):
+ def __init__(self, env, info):
+ self._env = env
+ self._info = info
+
+ def equal_to(self, *fields):
+ """
+ Continues a Join transformation.
+
+ Defines the Tuple fields of the second join DataSet that should be used as join keys.
+ Note: Fields can only be selected as join keys on Tuple DataSets.
+
+ :param fields:The indexes of the Tuple fields of the second join DataSet that should be used as keys.
+ :return:An incomplete Join Transformation.
+ """
+ self._info[_Fields.KEY2] = fields
+ return JoinOperator(self._env, self._info)
+
+
+class JoinOperatorProjection(DataSet):
+ def __init__(self, env, info):
+ super(JoinOperatorProjection, self).__init__(env, info)
+
+ def project_first(self, *fields):
+ """
+ Initiates a ProjectJoin transformation.
+
+ Projects the first join input.
+ If the first join input is a Tuple DataSet, fields can be selected by their index.
+ If the first join input is not a Tuple DataSet, no parameters should be passed.
+
+ :param fields: The indexes of the selected fields.
+ :return: An incomplete JoinProjection.
+ """
+ self._info[_Fields.PROJECTIONS].append(("first", fields))
+ return self
+
+ def project_second(self, *fields):
+ """
+ Initiates a ProjectJoin transformation.
+
+ Projects the second join input.
+ If the second join input is a Tuple DataSet, fields can be selected by their index.
+ If the second join input is not a Tuple DataSet, no parameters should be passed.
+
+ :param fields: The indexes of the selected fields.
+ :return: An incomplete JoinProjection.
+ """
+ self._info[_Fields.PROJECTIONS].append(("second", fields))
+ return self
+
+
+class JoinOperator(DataSet):
+ def __init__(self, env, info):
+ super(JoinOperator, self).__init__(env, info)
+ self._info[_Fields.TYPES] = None
+
+ def project_first(self, *fields):
+ """
+ Initiates a ProjectJoin transformation.
+
+ Projects the first join input.
+ If the first join input is a Tuple DataSet, fields can be selected by their index.
+ If the first join input is not a Tuple DataSet, no parameters should be passed.
+
+ :param fields: The indexes of the selected fields.
+ :return: An incomplete JoinProjection.
+ """
+ return JoinOperatorProjection(self._env, self._info).project_first(*fields)
+
+ def project_second(self, *fields):
+ """
+ Initiates a ProjectJoin transformation.
+
+ Projects the second join input.
+ If the second join input is a Tuple DataSet, fields can be selected by their index.
+ If the second join input is not a Tuple DataSet, no parameters should be passed.
+
+ :param fields: The indexes of the selected fields.
+ :return: An incomplete JoinProjection.
+ """
+ return JoinOperatorProjection(self._env, self._info).project_second(*fields)
+
+ def using(self, operator, types):
+ """
+ Finalizes a Join transformation.
+
+ Applies a JoinFunction to each pair of joined elements. Each JoinFunction call returns exactly one element.
+
+ :param operator:The JoinFunction that is called for each pair of joined elements.
+ :param types:
+ :return:An Set that represents the joined result DataSet.
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = JoinFunction()
+ operator.join = f
+ self._info[_Fields.OPERATOR] = operator
+ self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ self._info[_Fields.TYPES] = types
+ self._info[_Fields.NAME] = "PythonJoin"
+ self._env._sets.append(self._info)
+ return OperatorSet(self._env, self._info, copy_set=True)
+
+
+class CrossOperatorProjection(DataSet):
+ def __init__(self, env, info):
+ super(CrossOperatorProjection, self).__init__(env, info)
+
+ def project_first(self, *fields):
+ """
+ Initiates a ProjectCross transformation.
+
+ Projects the first join input.
+ If the first join input is a Tuple DataSet, fields can be selected by their index.
+ If the first join input is not a Tuple DataSet, no parameters should be passed.
+
+ :param fields: The indexes of the selected fields.
+ :return: An incomplete CrossProjection.
+ """
+ self._info[_Fields.PROJECTIONS].append(("first", fields))
+ return self
+
+ def project_second(self, *fields):
+ """
+ Initiates a ProjectCross transformation.
+
+ Projects the second join input.
+ If the second join input is a Tuple DataSet, fields can be selected by their index.
+ If the second join input is not a Tuple DataSet, no parameters should be passed.
+
+ :param fields: The indexes of the selected fields.
+ :return: An incomplete CrossProjection.
+ """
+ self._info[_Fields.PROJECTIONS].append(("second", fields))
+ return self
+
+
+class CrossOperator(DataSet):
+ def __init__(self, env, info):
+ super(CrossOperator, self).__init__(env, info)
+ info[_Fields.TYPES] = None
+
+ def project_first(self, *fields):
+ """
+ Initiates a ProjectCross transformation.
+
+ Projects the first join input.
+ If the first join input is a Tuple DataSet, fields can be selected by their index.
+ If the first join input is not a Tuple DataSet, no parameters should be passed.
+
+ :param fields: The indexes of the selected fields.
+ :return: An incomplete CrossProjection.
+ """
+ return CrossOperatorProjection(self._env, self._info).project_first(*fields)
+
+ def project_second(self, *fields):
+ """
+ Initiates a ProjectCross transformation.
+
+ Projects the second join input.
+ If the second join input is a Tuple DataSet, fields can be selected by their index.
+ If the second join input is not a Tuple DataSet, no parameters should be passed.
+
+ :param fields: The indexes of the selected fields.
+ :return: An incomplete CrossProjection.
+ """
+ return CrossOperatorProjection(self._env, self._info).project_second(*fields)
+
+ def using(self, operator, types):
+ """
+ Finalizes a Cross transformation.
+
+ Applies a CrossFunction to each pair of joined elements. Each CrossFunction call returns exactly one element.
+
+ :param operator:The CrossFunction that is called for each pair of joined elements.
+ :param types: The type of the resulting DataSet.
+ :return:An Set that represents the joined result DataSet.
+ """
+ if isinstance(operator, TYPES.FunctionType):
+ f = operator
+ operator = CrossFunction()
+ operator.cross = f
+ self._info[_Fields.OPERATOR] = operator
+ self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ self._info[_Fields.TYPES] = types
+ self._info[_Fields.NAME] = "PythonCross"
+ return OperatorSet(self._env, self._info, copy_set=True)
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
new file mode 100644
index 0000000..236eda4
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -0,0 +1,345 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.connection import Connection
+from flink.connection import Collector
+from flink.plan.DataSet import DataSet
+from flink.plan.Constants import _Fields, _Identifier
+from flink.utilities import Switch
+import copy
+import sys
+
+
+def get_environment():
+ """
+ Creates an execution environment that represents the context in which the program is currently executed.
+
+ :return:The execution environment of the context in which the program is executed.
+ """
+ return Environment()
+
+
+class Environment(object):
+ def __init__(self):
+ # util
+ self._counter = 0
+
+ #parameters
+ self._parameters = []
+
+ #sets
+ self._sources = []
+ self._sets = []
+ self._sinks = []
+
+ #specials
+ self._broadcast = []
+
+ def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
+ """
+ Create a DataSet that represents the tuples produced by reading the given CSV file.
+
+ :param path: The path of the CSV file.
+ :param types: Specifies the types for the CSV fields.
+ :return:A CsvReader that can be used to configure the CSV input.
+ """
+ child = dict()
+ child_set = DataSet(self, child)
+ child[_Fields.IDENTIFIER] = _Identifier.SOURCE_CSV
+ child[_Fields.DELIMITER_LINE] = line_delimiter
+ child[_Fields.DELIMITER_FIELD] = field_delimiter
+ child[_Fields.PATH] = path
+ child[_Fields.TYPES] = types
+ self._sources.append(child)
+ return child_set
+
+ def read_text(self, path):
+ """
+ Creates a DataSet that represents the Strings produced by reading the given file line wise.
+
+ The file will be read with the system's default character set.
+
+ :param path: The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ :return: A DataSet that represents the data read from the given file as text lines.
+ """
+ child = dict()
+ child_set = DataSet(self, child)
+ child[_Fields.IDENTIFIER] = _Identifier.SOURCE_TEXT
+ child[_Fields.PATH] = path
+ self._sources.append(child)
+ return child_set
+
+ def from_elements(self, *elements):
+ """
+ Creates a new data set that contains the given elements.
+
+ The elements must all be of the same type, for example, all of the String or Integer.
+ The sequence of elements must not be empty.
+
+ :param elements: The elements to make up the data set.
+ :return: A DataSet representing the given list of elements.
+ """
+ child = dict()
+ child_set = DataSet(self, child)
+ child[_Fields.IDENTIFIER] = _Identifier.SOURCE_VALUE
+ child[_Fields.VALUES] = elements
+ self._sources.append(child)
+ return child_set
+
+ def set_degree_of_parallelism(self, degree):
+ """
+ Sets the degree of parallelism (DOP) for operations executed through this environment.
+
+ Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with x parallel instances.
+
+ :param degreeOfParallelism: The degree of parallelism
+ """
+ self._parameters.append(("dop", degree))
+
+ def execute(self, local=False, debug=False):
+ """
+ Triggers the program execution.
+
+ The environment will execute all parts of the program that have resulted in a "sink" operation.
+ """
+ if debug:
+ local = True
+ self._parameters.append(("mode", local))
+ self._parameters.append(("debug", debug))
+ self._optimize_plan()
+
+ plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
+
+ if plan_mode:
+ output_path = sys.stdin.readline().rstrip('\n')
+ self._connection = Connection.OneWayBusyBufferingMappedFileConnection(output_path)
+ self._collector = Collector.TypedCollector(self._connection)
+ self._send_plan()
+ self._connection._write_buffer()
+ else:
+ import struct
+ operator = None
+ try:
+ port = int(sys.stdin.readline().rstrip('\n'))
+
+ id = int(sys.stdin.readline().rstrip('\n'))
+ input_path = sys.stdin.readline().rstrip('\n')
+ output_path = sys.stdin.readline().rstrip('\n')
+
+ operator = None
+ for set in self._sets:
+ if set[_Fields.ID] == id:
+ operator = set[_Fields.OPERATOR]
+ if set[_Fields.ID] == -id:
+ operator = set[_Fields.COMBINEOP]
+ operator._configure(input_path, output_path, port)
+ operator._go()
+ sys.stdout.flush()
+ sys.stderr.flush()
+ except:
+ sys.stdout.flush()
+ sys.stderr.flush()
+ if operator is not None:
+ operator._connection._socket.send(struct.pack(">i", -2))
+ raise
+
+ def _optimize_plan(self):
+ self._find_chains()
+
+ def _find_chains(self):
+ udf = set([_Identifier.MAP, _Identifier.FLATMAP, _Identifier.FILTER, _Identifier.MAPPARTITION,
+ _Identifier.GROUPREDUCE, _Identifier.REDUCE, _Identifier.COGROUP,
+ _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST,
+ _Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT])
+ chainable = set([_Identifier.MAP, _Identifier.FILTER, _Identifier.FLATMAP, _Identifier.GROUPREDUCE, _Identifier.REDUCE])
+ multi_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
+ x = len(self._sets) - 1
+ while x > -1:
+ child = self._sets[x]
+ child_type = child[_Fields.IDENTIFIER]
+ if child_type in chainable:
+ parent = child[_Fields.PARENT]
+ parent_type = parent[_Fields.IDENTIFIER]
+ if len(parent[_Fields.SINKS]) == 0:
+ if child_type == _Identifier.GROUPREDUCE or child_type == _Identifier.REDUCE:
+ if child[_Fields.COMBINE]:
+ while parent_type == _Identifier.GROUP or parent_type == _Identifier.SORT:
+ parent = parent[_Fields.PARENT]
+ parent_type = parent[_Fields.IDENTIFIER]
+ if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
+ if parent[_Fields.OPERATOR] is not None:
+ function = child[_Fields.COMBINEOP]
+ parent[_Fields.OPERATOR]._chain(function)
+ child[_Fields.COMBINE] = False
+ parent[_Fields.NAME] += " -> PythonCombine"
+ for bcvar in child[_Fields.BCVARS]:
+ bcvar_copy = copy.deepcopy(bcvar)
+ bcvar_copy[_Fields.PARENT] = parent
+ self._broadcast.append(bcvar_copy)
+ else:
+ if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
+ parent_op = parent[_Fields.OPERATOR]
+ if parent_op is not None:
+ function = child[_Fields.OPERATOR]
+ parent_op._chain(function)
+ parent[_Fields.NAME] += " -> " + child[_Fields.NAME]
+ parent[_Fields.TYPES] = child[_Fields.TYPES]
+ for grand_child in child[_Fields.CHILDREN]:
+ if grand_child[_Fields.IDENTIFIER] in multi_input:
+ if grand_child[_Fields.PARENT][_Fields.ID] == child[_Fields.ID]:
+ grand_child[_Fields.PARENT] = parent
+ else:
+ grand_child[_Fields.OTHER] = parent
+ else:
+ grand_child[_Fields.PARENT] = parent
+ parent[_Fields.CHILDREN].append(grand_child)
+ parent[_Fields.CHILDREN].remove(child)
+ for sink in child[_Fields.SINKS]:
+ sink[_Fields.PARENT] = parent
+ parent[_Fields.SINKS].append(sink)
+ for bcvar in child[_Fields.BCVARS]:
+ bcvar[_Fields.PARENT] = parent
+ parent[_Fields.BCVARS].append(bcvar)
+ self._remove_set((child))
+ x -= 1
+
+ def _remove_set(self, set):
+ self._sets[:] = [s for s in self._sets if s[_Fields.ID]!=set[_Fields.ID]]
+
+ def _send_plan(self):
+ self._send_parameters()
+ self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) + len(self._broadcast))
+ self._send_sources()
+ self._send_operations()
+ self._send_sinks()
+ self._send_broadcast()
+
+ def _send_parameters(self):
+ self._collector.collect(len(self._parameters))
+ for parameter in self._parameters:
+ self._collector.collect(parameter)
+
+ def _send_sources(self):
+ for source in self._sources:
+ identifier = source[_Fields.IDENTIFIER]
+ collect = self._collector.collect
+ collect(identifier)
+ collect(source[_Fields.ID])
+ for case in Switch(identifier):
+ if case(_Identifier.SOURCE_CSV):
+ collect(source[_Fields.PATH])
+ collect(source[_Fields.DELIMITER_FIELD])
+ collect(source[_Fields.DELIMITER_LINE])
+ collect(source[_Fields.TYPES])
+ break
+ if case(_Identifier.SOURCE_TEXT):
+ collect(source[_Fields.PATH])
+ break
+ if case(_Identifier.SOURCE_VALUE):
+ collect(len(source[_Fields.VALUES]))
+ for value in source[_Fields.VALUES]:
+ collect(value)
+ break
+
+ def _send_operations(self):
+ collect = self._collector.collect
+ for set in self._sets:
+ identifier = set.get(_Fields.IDENTIFIER)
+ collect(set[_Fields.IDENTIFIER])
+ collect(set[_Fields.ID])
+ collect(set[_Fields.PARENT][_Fields.ID])
+ for case in Switch(identifier):
+ if case(_Identifier.SORT):
+ collect(set[_Fields.FIELD])
+ collect(set[_Fields.ORDER])
+ break
+ if case(_Identifier.GROUP):
+ collect(set[_Fields.KEYS])
+ break
+ if case(_Identifier.COGROUP):
+ collect(set[_Fields.OTHER][_Fields.ID])
+ collect(set[_Fields.KEY1])
+ collect(set[_Fields.KEY2])
+ collect(set[_Fields.TYPES])
+ collect(set[_Fields.NAME])
+ break
+ if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST):
+ collect(set[_Fields.OTHER][_Fields.ID])
+ collect(set[_Fields.TYPES])
+ collect(len(set[_Fields.PROJECTIONS]))
+ for p in set[_Fields.PROJECTIONS]:
+ collect(p[0])
+ collect(p[1])
+ collect(set[_Fields.NAME])
+ break
+ if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
+ collect(set[_Fields.TYPES])
+ collect(set[_Fields.COMBINE])
+ collect(set[_Fields.NAME])
+ break
+ if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT):
+ collect(set[_Fields.KEY1])
+ collect(set[_Fields.KEY2])
+ collect(set[_Fields.OTHER][_Fields.ID])
+ collect(set[_Fields.TYPES])
+ collect(len(set[_Fields.PROJECTIONS]))
+ for p in set[_Fields.PROJECTIONS]:
+ collect(p[0])
+ collect(p[1])
+ collect(set[_Fields.NAME])
+ break
+ if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER):
+ collect(set[_Fields.TYPES])
+ collect(set[_Fields.NAME])
+ break
+ if case(_Identifier.UNION):
+ collect(set[_Fields.OTHER][_Fields.ID])
+ break
+ if case(_Identifier.PROJECTION):
+ collect(set[_Fields.KEYS])
+ break
+ if case():
+ raise KeyError("Environment._send_child_sets(): Invalid operation identifier: " + str(identifier))
+
+ def _send_sinks(self):
+ for sink in self._sinks:
+ identifier = sink[_Fields.IDENTIFIER]
+ collect = self._collector.collect
+ collect(identifier)
+ collect(sink[_Fields.PARENT][_Fields.ID])
+ for case in Switch(identifier):
+ if case(_Identifier.SINK_CSV):
+ collect(sink[_Fields.PATH])
+ collect(sink[_Fields.DELIMITER_FIELD])
+ collect(sink[_Fields.DELIMITER_LINE])
+ collect(sink[_Fields.WRITE_MODE])
+ break;
+ if case(_Identifier.SINK_TEXT):
+ collect(sink[_Fields.PATH])
+ collect(sink[_Fields.WRITE_MODE])
+ break
+ if case(_Identifier.SINK_PRINT):
+ collect(sink[_Fields.TO_ERR])
+ break
+
+ def _send_broadcast(self):
+ collect = self._collector.collect
+ for entry in self._broadcast:
+ collect(_Identifier.BROADCAST)
+ collect(entry[_Fields.PARENT][_Fields.ID])
+ collect(entry[_Fields.OTHER][_Fields.ID])
+ collect(entry[_Fields.NAME])
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/__init__.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/__init__.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/utilities/__init__.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/utilities/__init__.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/utilities/__init__.py
new file mode 100644
index 0000000..faae78a
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/utilities/__init__.py
@@ -0,0 +1,36 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class Switch(object):
+ def __init__(self, value):
+ self.value = value
+ self.fall = False
+
+ def __iter__(self):
+ yield self.match
+ raise StopIteration
+
+ def match(self, *args):
+ if self.fall or not args:
+ return True
+ elif self.value in args:
+ self.fall = True
+ return True
+ else:
+ return False
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/setup.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/setup.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/setup.py
new file mode 100644
index 0000000..82447e9
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/setup.py
@@ -0,0 +1,33 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from distutils.core import setup
+
+setup(
+ name='flink',
+ version='1.0',
+ packages=['flink',
+ 'flink.connection',
+ 'flink.functions',
+ 'flink.plan',
+ 'flink.utilities'],
+ url='http://flink.apache.org',
+ license='Licensed under the Apache License, Version 2.0',
+ author='',
+ author_email='',
+ description='Flink Python API'
+)
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
new file mode 100644
index 0000000..697731f
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_2;
+import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_3;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PythonPlanBinderTest {
+ private static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class);
+
+ private static boolean python2Supported = true;
+ private static boolean python3Supported = true;
+ private static List<String> TEST_FILES;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ findTestFiles();
+ checkPythonSupport();
+ }
+
+ private static void findTestFiles() throws Exception {
+ TEST_FILES = new ArrayList();
+ FileSystem fs = FileSystem.getLocalFileSystem();
+ FileStatus[] status = fs.listStatus(
+ new Path(fs.getWorkingDirectory().toString()
+ + "/src/test/python/org/apache/flink/python/api"));
+ for (FileStatus f : status) {
+ String file = f.getPath().toString();
+ if (file.endsWith(".py")) {
+ TEST_FILES.add(file);
+ }
+ }
+ }
+
+ private static void checkPythonSupport() {
+ try {
+ Runtime.getRuntime().exec("python");
+ } catch (IOException ex) {
+ python2Supported = false;
+ LOG.info("No Python 2 runtime detected.");
+ }
+ try {
+ Runtime.getRuntime().exec("python3");
+ } catch (IOException ex) {
+ python3Supported = false;
+ LOG.info("No Python 3 runtime detected.");
+ }
+ }
+
+ @Test
+ public void testPython2() throws Exception {
+ if (python2Supported) {
+ for (String file : TEST_FILES) {
+ LOG.info("testing " + file);
+ PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file});
+ }
+ }
+ }
+
+ @Test
+ public void testPython3() throws Exception {
+ if (python3Supported) {
+ for (String file : TEST_FILES) {
+ LOG.info("testing " + file);
+ PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file});
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv
new file mode 100644
index 0000000..a103a5c
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv
@@ -0,0 +1,2 @@
+4,2,hello
+3,2,world
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text
new file mode 100644
index 0000000..e7be084
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text
@@ -0,0 +1,2 @@
+sup guys
+i am the world
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py
new file mode 100644
index 0000000..62b6a1d
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py
@@ -0,0 +1,31 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+from flink.plan.Constants import WriteMode
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))
+
+ d1.write_csv("/tmp/flink/result", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
+
+ env.set_degree_of_parallelism(1)
+
+ env.execute(local=True)