You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/12 21:09:21 UTC

[6/8] flink git commit: [FLINK-2901] Move PythonAPI to flink-libraries

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapFunction.py
new file mode 100644
index 0000000..882cfee
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapFunction.py
@@ -0,0 +1,36 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class MapFunction(Function.Function):
+    def __init__(self):
+        super(MapFunction, self).__init__()
+
+    def _run(self):
+        collector = self._collector
+        function = self.map
+        for value in self._iterator:
+            collector.collect(function(value))
+        collector._close()
+
+    def collect(self, value):
+        self._collector.collect(self.map(value))
+
+    def map(self, value):
+        pass

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapPartitionFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapPartitionFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapPartitionFunction.py
new file mode 100644
index 0000000..38bff98
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/MapPartitionFunction.py
@@ -0,0 +1,34 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions import Function
+
+
+class MapPartitionFunction(Function.Function):
+    def __init__(self):
+        super(MapPartitionFunction, self).__init__()
+
+    def _run(self):
+        collector = self._collector
+        result = self.map_partition(self._iterator, collector)
+        if result is not None:
+            for res in result:
+                collector.collect(res)
+        collector._close()
+
+    def map_partition(self, iterator, collector):
+        pass

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
new file mode 100644
index 0000000..ffa6de0
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -0,0 +1,123 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from collections import defaultdict
+from flink.functions import Function, RuntimeContext
+from flink.connection import Connection, Iterator, Collector
+
+
+class ReduceFunction(Function.Function):
+    def __init__(self):
+        super(ReduceFunction, self).__init__()
+        self._keys = None
+        self._combine = False
+        self._values = []
+
+    def _configure(self, input_file, output_file, port):
+        if self._combine:
+            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
+            self._iterator = Iterator.Iterator(self._connection)
+            self._collector = Collector.Collector(self._connection)
+            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+            self._run = self._run_combine
+        else:
+            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
+            self._iterator = Iterator.Iterator(self._connection)
+            if self._keys is None:
+                self._run = self._run_allreduce
+            else:
+                self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
+            self._configure_chain(Collector.Collector(self._connection))
+            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+
+    def _set_grouping_keys(self, keys):
+        self._keys = keys
+
+    def _close(self):
+        self._sort_and_combine()
+        self._collector._close()
+
+    def _run(self):#grouped reduce
+        collector = self._collector
+        function = self.reduce
+        iterator = self._group_iterator
+        iterator._init()
+        while iterator.has_group():
+            iterator.next_group()
+            if iterator.has_next():
+                base = iterator.next()
+                for value in iterator:
+                    base = function(base, value)
+            collector.collect(base)
+        collector._close()
+
+    def _run_allreduce(self):#ungrouped reduce
+        collector = self._collector
+        function = self.reduce
+        iterator = self._iterator
+        if iterator.has_next():
+            base = iterator.next()
+            for value in iterator:
+                base = function(base, value)
+            collector.collect(base)
+        collector._close()
+
+    def _run_combine(self):#unchained combine
+        connection = self._connection
+        collector = self._collector
+        function = self.combine
+        iterator = self._iterator
+        while 1:
+            if iterator.has_next():
+                base = iterator.next()
+                while iterator.has_next():
+                    base = function(base, iterator.next())
+            collector.collect(base)
+            connection.send_end_signal()
+            connection.reset()
+
+    def collect(self, value):#chained combine
+        self._values.append(value)
+        if len(self._values) > 1000:
+            self._sort_and_combine()
+
+    def _sort_and_combine(self):
+        values = self._values
+        function = self.combine
+        collector = self._collector
+        extractor = self._extract_keys
+        grouping = defaultdict(list)
+        for value in values:
+            grouping[extractor(value)].append(value)
+        keys = list(grouping.keys())
+        keys.sort()
+        for key in keys:
+            iterator = Iterator.ListIterator(grouping[key])
+            base = iterator.next()
+            while iterator.has_next():
+                base = function(base, iterator.next())
+            collector.collect(base)
+        self._values = []
+
+    def _extract_keys(self, x):
+        return tuple([x[k] for k in self._keys])
+
+    def reduce(self, value1, value2):
+        pass
+
+    def combine(self, value1, value2):
+        return self.reduce(value1, value2)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
new file mode 100644
index 0000000..2977eb5
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class RuntimeContext(object):
+    def __init__(self, iterator, collector):
+        self.iterator = iterator
+        self.collector = collector
+        self.broadcast_variables = dict()
+
+    def _add_broadcast_variable(self, name, var):
+        self.broadcast_variables[name] = var
+
+    def get_broadcast_variable(self, name):
+        return self.broadcast_variables[name]

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/__init__.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/__init__.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
new file mode 100644
index 0000000..f60273f
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -0,0 +1,106 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class _Identifier(object):
+    """
+    Gotta be kept in sync with java constants!
+    """
+    SORT = "sort"
+    GROUP = "groupby"
+    COGROUP = "cogroup"
+    CROSS = "cross"
+    CROSSH = "cross_h"
+    CROSST = "cross_t"
+    FLATMAP = "flatmap"
+    FILTER = "filter"
+    MAPPARTITION = "mappartition"
+    GROUPREDUCE = "groupreduce"
+    JOIN = "join"
+    JOINH = "join_h"
+    JOINT = "join_t"
+    MAP = "map"
+    PROJECTION = "projection"
+    REDUCE = "reduce"
+    UNION = "union"
+    SOURCE_CSV = "source_csv"
+    SOURCE_TEXT = "source_text"
+    SOURCE_VALUE = "source_value"
+    SINK_CSV = "sink_csv"
+    SINK_TEXT = "sink_text"
+    SINK_PRINT = "sink_print"
+    BROADCAST = "broadcast"
+
+
+class _Fields(object):
+    PARENT = "parent"
+    OTHER = "other_set"
+    SINKS = "sinks"
+    IDENTIFIER = "identifier"
+    FIELD = "field"
+    ORDER = "order"
+    KEYS = "keys"
+    KEY1 = "key1"
+    KEY2 = "key2"
+    TYPES = "types"
+    OPERATOR = "operator"
+    META = "meta"
+    NAME = "name"
+    COMBINE = "combine"
+    DELIMITER_LINE = "del_l"
+    DELIMITER_FIELD = "del_f"
+    WRITE_MODE = "write"
+    PATH = "path"
+    VALUES = "values"
+    COMBINEOP = "combineop"
+    CHILDREN = "children"
+    BCVARS = "bcvars"
+    PROJECTIONS = "projections"
+    ID = "id"
+    TO_ERR = "to_error"
+
+
+class WriteMode(object):
+    NO_OVERWRITE = 0
+    OVERWRITE = 1
+
+
+class Order(object):
+    NONE = 0
+    ASCENDING = 1
+    DESCENDING = 2
+    ANY = 3
+
+import sys
+
+PY2 = sys.version_info[0] == 2
+PY3 = sys.version_info[0] == 3
+
+if PY2:
+    BOOL = True
+    INT = 1
+    LONG = long(1)
+    FLOAT = 2.5
+    STRING = "type"
+    BYTES = bytearray(b"byte")
+elif PY3:
+    BOOL = True
+    INT = 1
+    FLOAT = 2.5
+    STRING = "type"
+    BYTES = bytearray(b"byte")

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
new file mode 100644
index 0000000..390a08d
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -0,0 +1,906 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import inspect
+import copy
+import types as TYPES
+
+from flink.plan.Constants import _Fields, _Identifier, WriteMode, STRING
+from flink.functions.CoGroupFunction import CoGroupFunction
+from flink.functions.FilterFunction import FilterFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.CrossFunction import CrossFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.JoinFunction import JoinFunction
+from flink.functions.MapFunction import MapFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.functions.ReduceFunction import ReduceFunction
+
+def deduct_output_type(dataset):
+    skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION])
+    source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE])
+    default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN])
+
+    while True:
+        dataset_type = dataset[_Fields.IDENTIFIER]
+        if dataset_type in skip:
+            dataset = dataset[_Fields.PARENT]
+            continue
+        if dataset_type in source:
+            if dataset_type == _Identifier.SOURCE_TEXT:
+                return STRING
+            if dataset_type == _Identifier.SOURCE_VALUE:
+                return dataset[_Fields.VALUES][0]
+            if dataset_type == _Identifier.SOURCE_CSV:
+                return dataset[_Fields.TYPES]
+        if dataset_type == _Identifier.PROJECTION:
+            return tuple([deduct_output_type(dataset[_Fields.PARENT])[k] for k in dataset[_Fields.KEYS]])
+        if dataset_type in default:
+            if dataset[_Fields.OPERATOR] is not None: #udf-join/cross
+                return dataset[_Fields.TYPES]
+            if len(dataset[_Fields.PROJECTIONS]) == 0: #defaultjoin/-cross
+                return (deduct_output_type(dataset[_Fields.PARENT]), deduct_output_type(dataset[_Fields.OTHER]))
+            else: #projectjoin/-cross
+                t1 = deduct_output_type(dataset[_Fields.PARENT])
+                t2 = deduct_output_type(dataset[_Fields.OTHER])
+                out_type = []
+                for prj in dataset[_Fields.PROJECTIONS]:
+                    if len(prj[1]) == 0: #projection on non-tuple dataset
+                        if prj[0] == "first":
+                            out_type.append(t1)
+                        else:
+                            out_type.append(t2)
+                    else: #projection on tuple dataset
+                        for key in prj[1]:
+                            if prj[0] == "first":
+                                out_type.append(t1[key])
+                            else:
+                                out_type.append(t2[key])
+                return tuple(out_type)
+        return dataset[_Fields.TYPES]
+
+
+class Set(object):
+    def __init__(self, env, info, copy_set=False):
+        self._env = env
+        self._info = info
+        if not copy_set:
+            self._info[_Fields.ID] = env._counter
+            self._info[_Fields.BCVARS] = []
+            self._info[_Fields.CHILDREN] = []
+            self._info[_Fields.SINKS] = []
+            self._info[_Fields.NAME] = None
+            env._counter += 1
+
+    def output(self, to_error=False):
+        """
+        Writes a DataSet to the standard output stream (stdout).
+        """
+        child = dict()
+        child[_Fields.IDENTIFIER] = _Identifier.SINK_PRINT
+        child[_Fields.PARENT] = self._info
+        child[_Fields.TO_ERR] = to_error
+        self._info[_Fields.SINKS].append(child)
+        self._env._sinks.append(child)
+
+    def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
+        """
+        Writes a DataSet as a text file to the specified location.
+
+        :param path: he path pointing to the location the text file is written to.
+        :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
+        """
+        child = dict()
+        child[_Fields.IDENTIFIER] = _Identifier.SINK_TEXT
+        child[_Fields.PARENT] = self._info
+        child[_Fields.PATH] = path
+        child[_Fields.WRITE_MODE] = write_mode
+        self._info[_Fields.SINKS].append(child)
+        self._env._sinks.append(child)
+
+    def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
+        """
+        Writes a Tuple DataSet as a CSV file to the specified location.
+
+        Note: Only a Tuple DataSet can written as a CSV file.
+        :param path: The path pointing to the location the CSV file is written to.
+        :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
+        """
+        child = dict()
+        child[_Fields.IDENTIFIER] = _Identifier.SINK_CSV
+        child[_Fields.PATH] = path
+        child[_Fields.PARENT] = self._info
+        child[_Fields.DELIMITER_FIELD] = field_delimiter
+        child[_Fields.DELIMITER_LINE] = line_delimiter
+        child[_Fields.WRITE_MODE] = write_mode
+        self._info[_Fields.SINKS].append(child)
+        self._env._sinks.append(child)
+
+    def reduce_group(self, operator, types, combinable=False):
+        """
+        Applies a GroupReduce transformation.
+
+        The transformation calls a GroupReduceFunction once for each group of the DataSet, or one when applied on a
+        non-grouped DataSet.
+        The GroupReduceFunction can iterate over all elements of the DataSet and
+        emit any number of output elements including none.
+
+        :param operator: The GroupReduceFunction that is applied on the DataSet.
+        :param types: The type of the resulting DataSet.
+        :return:A GroupReduceOperator that represents the reduced DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = GroupReduceFunction()
+            operator.reduce = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = copy.deepcopy(operator)
+        child[_Fields.OPERATOR]._combine = False
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.COMBINE] = combinable
+        child[_Fields.COMBINEOP] = operator
+        child[_Fields.COMBINEOP]._combine = True
+        child[_Fields.NAME] = "PythonGroupReduce"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+
+class ReduceSet(Set):
+    def __init__(self, env, info, copy_set=False):
+        super(ReduceSet, self).__init__(env, info, copy_set)
+        if not copy_set:
+            self._is_chained = False
+
+    def reduce(self, operator):
+        """
+        Applies a Reduce transformation on a non-grouped DataSet.
+
+        The transformation consecutively calls a ReduceFunction until only a single element remains which is the result
+        of the transformation. A ReduceFunction combines two elements into one new element of the same type.
+
+        :param operator:The ReduceFunction that is applied on the DataSet.
+        :return:A ReduceOperator that represents the reduced DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = ReduceFunction()
+            operator.reduce = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.REDUCE
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.COMBINEOP] = operator
+        child[_Fields.COMBINE] = False
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child[_Fields.NAME] = "PythonReduce"
+        child[_Fields.TYPES] = deduct_output_type(self._info)
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+
+class DataSet(ReduceSet):
+    def __init__(self, env, info, copy_set=False):
+        super(DataSet, self).__init__(env, info, copy_set)
+
+    def project(self, *fields):
+        """
+        Applies a Project transformation on a Tuple DataSet.
+
+        Note: Only Tuple DataSets can be projected. The transformation projects each Tuple of the DataSet onto a
+        (sub)set of fields.
+
+        :param fields: The field indexes of the input tuples that are retained.
+                        The order of fields in the output tuple corresponds to the order of field indexes.
+        :return: The projected DataSet.
+
+        """
+        child = dict()
+        child_set = DataSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.PROJECTION
+        child[_Fields.PARENT] = self._info
+        child[_Fields.KEYS] = fields
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def group_by(self, *keys):
+        """
+        Groups a Tuple DataSet using field position keys.
+        Note: Field position keys only be specified for Tuple DataSets.
+        The field position keys specify the fields of Tuples on which the DataSet is grouped.
+        This method returns an UnsortedGrouping on which one of the following grouping transformation can be applied.
+        sort_group() to get a SortedGrouping.
+        reduce() to apply a Reduce transformation.
+        group_reduce() to apply a GroupReduce transformation.
+
+        :param keys: One or more field positions on which the DataSet will be grouped.
+        :return:A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
+        """
+        child = dict()
+        child_chain = []
+        child_set = UnsortedGrouping(self._env, child, child_chain)
+        child[_Fields.IDENTIFIER] = _Identifier.GROUP
+        child[_Fields.PARENT] = self._info
+        child[_Fields.KEYS] = keys
+        child_chain.append(child)
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def co_group(self, other_set):
+        """
+        Initiates a CoGroup transformation which combines the elements of two DataSets into on DataSet.
+
+        It groups each DataSet individually on a key and gives groups of both DataSets with equal keys together into a
+        CoGroupFunction. If a DataSet has a group with no matching key in the other DataSet,
+        the CoGroupFunction is called with an empty group for the non-existing group.
+        The CoGroupFunction can iterate over the elements of both groups and return any number of elements
+        including none.
+
+        :param other_set: The other DataSet of the CoGroup transformation.
+        :return:A CoGroupOperator to continue the definition of the CoGroup transformation.
+        """
+        child = dict()
+        other_set._info[_Fields.CHILDREN].append(child)
+        child_set = CoGroupOperatorWhere(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.COGROUP
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = other_set._info
+        self._info[_Fields.CHILDREN].append(child)
+        return child_set
+
+    def cross(self, other_set):
+        """
+        Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
+
+        It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
+
+        :param other_set: The other DataSet with which this DataSet is crossed.
+        :return:A CrossOperator to continue the definition of the Cross transformation.
+        """
+        return self._cross(other_set, _Identifier.CROSS)
+
+    def cross_with_huge(self, other_set):
+        """
+        Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
+
+        It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
+        This method also gives the hint to the optimizer that
+        the second DataSet to cross is much larger than the first one.
+
+        :param other_set: The other DataSet with which this DataSet is crossed.
+        :return:A CrossOperator to continue the definition of the Cross transformation.
+        """
+        return self._cross(other_set, _Identifier.CROSSH)
+
+    def cross_with_tiny(self, other_set):
+        """
+        Initiates a Cross transformation which combines the elements of two DataSets into one DataSet.
+
+        It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product.
+        This method also gives the hint to the optimizer that
+        the second DataSet to cross is much smaller than the first one.
+
+        :param other_set: The other DataSet with which this DataSet is crossed.
+        :return:A CrossOperator to continue the definition of the Cross transformation.
+        """
+        return self._cross(other_set, _Identifier.CROSST)
+
+    def _cross(self, other_set, identifier):
+        child = dict()
+        child_set = CrossOperator(self._env, child)
+        child[_Fields.IDENTIFIER] = identifier
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = other_set._info
+        child[_Fields.PROJECTIONS] = []
+        child[_Fields.OPERATOR] = None
+        child[_Fields.META] = None
+        self._info[_Fields.CHILDREN].append(child)
+        other_set._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def filter(self, operator):
+        """
+        Applies a Filter transformation on a DataSet.
+
+        he transformation calls a FilterFunction for each element of the DataSet and retains only those element
+        for which the function returns true. Elements for which the function returns false are filtered.
+
+        :param operator: The FilterFunction that is called for each element of the DataSet.
+        :return:A FilterOperator that represents the filtered DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = FilterFunction()
+            operator.filter = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.FILTER
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child[_Fields.NAME] = "PythonFilter"
+        child[_Fields.TYPES] = deduct_output_type(self._info)
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def flat_map(self, operator, types):
+        """
+        Applies a FlatMap transformation on a DataSet.
+
+        The transformation calls a FlatMapFunction for each element of the DataSet.
+        Each FlatMapFunction call can return any number of elements including none.
+
+        :param operator: The FlatMapFunction that is called for each element of the DataSet.
+        :param types: The type of the resulting DataSet.
+        :return:A FlatMapOperator that represents the transformed DataSe
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = FlatMapFunction()
+            operator.flat_map = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.FLATMAP
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.NAME] = "PythonFlatMap"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def join(self, other_set):
+        """
+        Initiates a Join transformation.
+
+        A Join transformation joins the elements of two DataSets on key equality.
+
+        :param other_set: The other DataSet with which this DataSet is joined
+        :return:A JoinOperator to continue the definition of the Join transformation.
+        """
+        return self._join(other_set, _Identifier.JOIN)
+
+    def join_with_huge(self, other_set):
+        """
+        Initiates a Join transformation.
+
+        A Join transformation joins the elements of two DataSets on key equality.
+        This method also gives the hint to the optimizer that
+        the second DataSet to join is much larger than the first one.
+
+        :param other_set: The other DataSet with which this DataSet is joined
+        :return:A JoinOperator to continue the definition of the Join transformation.
+        """
+        return self._join(other_set, _Identifier.JOINH)
+
+    def join_with_tiny(self, other_set):
+        """
+        Initiates a Join transformation.
+
+        A Join transformation joins the elements of two DataSets on key equality.
+        This method also gives the hint to the optimizer that
+        the second DataSet to join is much smaller than the first one.
+
+        :param other_set: The other DataSet with which this DataSet is joined
+        :return:A JoinOperator to continue the definition of the Join transformation.
+        """
+        return self._join(other_set, _Identifier.JOINT)
+
+    def _join(self, other_set, identifier):
+        child = dict()
+        child_set = JoinOperatorWhere(self._env, child)
+        child[_Fields.IDENTIFIER] = identifier
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = other_set._info
+        child[_Fields.OPERATOR] = None
+        child[_Fields.META] = None
+        child[_Fields.PROJECTIONS] = []
+        self._info[_Fields.CHILDREN].append(child)
+        other_set._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def map(self, operator, types):
+        """
+        Applies a Map transformation on a DataSet.
+
+        The transformation calls a MapFunction for each element of the DataSet.
+        Each MapFunction call returns exactly one element.
+
+        :param operator: The MapFunction that is called for each element of the DataSet.
+        :param types: The type of the resulting DataSet
+        :return:A MapOperator that represents the transformed DataSet
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = MapFunction()
+            operator.map = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.MAP
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.NAME] = "PythonMap"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def map_partition(self, operator, types):
+        """
+        Applies a MapPartition transformation on a DataSet.
+
+        The transformation calls a MapPartitionFunction once per parallel partition of the DataSet.
+        The entire partition is available through the given Iterator.
+        Each MapPartitionFunction may return an arbitrary number of results.
+
+        The number of elements that each instance of the MapPartition function
+        sees is non deterministic and depends on the degree of parallelism of the operation.
+
+        :param operator: The MapFunction that is called for each element of the DataSet.
+        :param types: The type of the resulting DataSet
+        :return:A MapOperator that represents the transformed DataSet
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = MapPartitionFunction()
+            operator.map_partition = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.MAPPARTITION
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.NAME] = "PythonMapPartition"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def union(self, other_set):
+        """
+        Creates a union of this DataSet with an other DataSet.
+
+        The other DataSet must be of the same data type.
+
+        :param other_set: The other DataSet which is unioned with the current DataSet.
+        :return:The resulting DataSet.
+        """
+        child = dict()
+        child_set = DataSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.UNION
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = other_set._info
+        self._info[_Fields.CHILDREN].append(child)
+        other_set._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+
+class OperatorSet(DataSet):
+    def __init__(self, env, info, copy_set=False):
+        super(OperatorSet, self).__init__(env, info, copy_set)
+
+    def with_broadcast_set(self, name, set):
+        child = dict()
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = set._info
+        child[_Fields.NAME] = name
+        self._info[_Fields.BCVARS].append(child)
+        set._info[_Fields.CHILDREN].append(child)
+        self._env._broadcast.append(child)
+        return self
+
+
+class Grouping(object):
+    def __init__(self, env, info, child_chain):
+        self._env = env
+        self._child_chain = child_chain
+        self._info = info
+        info[_Fields.ID] = env._counter
+        info[_Fields.CHILDREN] = []
+        info[_Fields.SINKS] = []
+        env._counter += 1
+
+    def reduce_group(self, operator, types, combinable=False):
+        """
+        Applies a GroupReduce transformation.
+
+        The transformation calls a GroupReduceFunction once for each group of the DataSet, or one when applied on a
+        non-grouped DataSet.
+        The GroupReduceFunction can iterate over all elements of the DataSet and
+        emit any number of output elements including none.
+
+        :param operator: The GroupReduceFunction that is applied on the DataSet.
+        :param types: The type of the resulting DataSet.
+        :return:A GroupReduceOperator that represents the reduced DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = GroupReduceFunction()
+            operator.reduce = f
+        operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
+        operator._set_sort_ops([(x[_Fields.FIELD], x[_Fields.ORDER]) for x in self._child_chain[1:]])
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = copy.deepcopy(operator)
+        child[_Fields.OPERATOR]._combine = False
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.COMBINE] = combinable
+        child[_Fields.COMBINEOP] = operator
+        child[_Fields.COMBINEOP]._combine = True
+        child[_Fields.NAME] = "PythonGroupReduce"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+
+        return child_set
+
+    def sort_group(self, field, order):
+        """
+        Sorts Tuple elements within a group on the specified field in the specified Order.
+
+        Note: Only groups of Tuple elements can be sorted.
+        Groups can be sorted by multiple fields by chaining sort_group() calls.
+
+        :param field:The Tuple field on which the group is sorted.
+        :param order: The Order in which the specified Tuple field is sorted. See DataSet.Order.
+        :return:A SortedGrouping with specified order of group element.
+        """
+        child = dict()
+        child_set = SortedGrouping(self._env, child, self._child_chain)
+        child[_Fields.IDENTIFIER] = _Identifier.SORT
+        child[_Fields.PARENT] = self._info
+        child[_Fields.FIELD] = field
+        child[_Fields.ORDER] = order
+        self._info[_Fields.CHILDREN].append(child)
+        self._child_chain.append(child)
+        self._env._sets.append(child)
+        return child_set
+
+
+class UnsortedGrouping(Grouping):
+    def __init__(self, env, info, child_chain):
+        super(UnsortedGrouping, self).__init__(env, info, child_chain)
+
+    def reduce(self, operator):
+        """
+        Applies a Reduce transformation on a non-grouped DataSet.
+
+        The transformation consecutively calls a ReduceFunction until only a single element remains which is the result
+        of the transformation. A ReduceFunction combines two elements into one new element of the same type.
+
+        :param operator:The ReduceFunction that is applied on the DataSet.
+        :return:A ReduceOperator that represents the reduced DataSet.
+        """
+        operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
+        for i in self._child_chain:
+            self._env._sets.append(i)
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.REDUCE
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = copy.deepcopy(operator)
+        child[_Fields.OPERATOR]._combine = False
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child[_Fields.COMBINE] = True
+        child[_Fields.COMBINEOP] = operator
+        child[_Fields.COMBINEOP]._combine = True
+        child[_Fields.NAME] = "PythonReduce"
+        child[_Fields.TYPES] = deduct_output_type(self._info)
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+
+        return child_set
+
+
+class SortedGrouping(Grouping):
+    def __init__(self, env, info, child_chain):
+        super(SortedGrouping, self).__init__(env, info, child_chain)
+
+
+class CoGroupOperatorWhere(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def where(self, *fields):
+        """
+        Continues a CoGroup transformation.
+
+        Defines the Tuple fields of the first co-grouped DataSet that should be used as grouping keys.
+        Note: Fields can only be selected as grouping keys on Tuple DataSets.
+
+        :param fields: The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys.
+        :return: An incomplete CoGroup transformation.
+        """
+        self._info[_Fields.KEY1] = fields
+        return CoGroupOperatorTo(self._env, self._info)
+
+
+class CoGroupOperatorTo(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def equal_to(self, *fields):
+        """
+        Continues a CoGroup transformation.
+
+        Defines the Tuple fields of the second co-grouped DataSet that should be used as grouping keys.
+        Note: Fields can only be selected as grouping keys on Tuple DataSets.
+
+        :param fields: The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys.
+        :return: An incomplete CoGroup transformation.
+        """
+        self._info[_Fields.KEY2] = fields
+        return CoGroupOperatorUsing(self._env, self._info)
+
+
+class CoGroupOperatorUsing(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def using(self, operator, types):
+        """
+        Finalizes a CoGroup transformation.
+
+        Applies a CoGroupFunction to groups of elements with identical keys.
+        Each CoGroupFunction call returns an arbitrary number of keys.
+
+        :param operator: The CoGroupFunction that is called for all groups of elements with identical keys.
+        :param types: The type of the resulting DataSet.
+        :return:An CoGroupOperator that represents the co-grouped result DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = CoGroupFunction()
+            operator.co_group = f
+        new_set = OperatorSet(self._env, self._info)
+        operator._keys1 = self._info[_Fields.KEY1]
+        operator._keys2 = self._info[_Fields.KEY2]
+        self._info[_Fields.OPERATOR] = operator
+        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        self._info[_Fields.TYPES] = types
+        self._info[_Fields.NAME] = "PythonCoGroup"
+        self._env._sets.append(self._info)
+        return new_set
+
+
+class JoinOperatorWhere(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def where(self, *fields):
+        """
+        Continues a Join transformation.
+
+        Defines the Tuple fields of the first join DataSet that should be used as join keys.
+        Note: Fields can only be selected as join keys on Tuple DataSets.
+
+        :param fields: The indexes of the Tuple fields of the first join DataSets that should be used as keys.
+        :return:An incomplete Join transformation.
+
+        """
+        self._info[_Fields.KEY1] = fields
+        return JoinOperatorTo(self._env, self._info)
+
+
+class JoinOperatorTo(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def equal_to(self, *fields):
+        """
+        Continues a Join transformation.
+
+        Defines the Tuple fields of the second join DataSet that should be used as join keys.
+        Note: Fields can only be selected as join keys on Tuple DataSets.
+
+        :param fields:The indexes of the Tuple fields of the second join DataSet that should be used as keys.
+        :return:An incomplete Join Transformation.
+        """
+        self._info[_Fields.KEY2] = fields
+        return JoinOperator(self._env, self._info)
+
+
+class JoinOperatorProjection(DataSet):
+    def __init__(self, env, info):
+        super(JoinOperatorProjection, self).__init__(env, info)
+
+    def project_first(self, *fields):
+        """
+        Initiates a ProjectJoin transformation.
+
+        Projects the first join input.
+        If the first join input is a Tuple DataSet, fields can be selected by their index.
+        If the first join input is not a Tuple DataSet, no parameters should be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete JoinProjection.
+        """
+        self._info[_Fields.PROJECTIONS].append(("first", fields))
+        return self
+
+    def project_second(self, *fields):
+        """
+        Initiates a ProjectJoin transformation.
+
+        Projects the second join input.
+        If the second join input is a Tuple DataSet, fields can be selected by their index.
+        If the second join input is not a Tuple DataSet, no parameters should be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete JoinProjection.
+        """
+        self._info[_Fields.PROJECTIONS].append(("second", fields))
+        return self
+
+
+class JoinOperator(DataSet):
+    def __init__(self, env, info):
+        super(JoinOperator, self).__init__(env, info)
+        self._info[_Fields.TYPES] = None
+
+    def project_first(self, *fields):
+        """
+        Initiates a ProjectJoin transformation.
+
+        Projects the first join input.
+        If the first join input is a Tuple DataSet, fields can be selected by their index.
+        If the first join input is not a Tuple DataSet, no parameters should be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete JoinProjection.
+        """
+        return JoinOperatorProjection(self._env, self._info).project_first(*fields)
+
+    def project_second(self, *fields):
+        """
+        Initiates a ProjectJoin transformation.
+
+        Projects the second join input.
+        If the second join input is a Tuple DataSet, fields can be selected by their index.
+        If the second join input is not a Tuple DataSet, no parameters should be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete JoinProjection.
+        """
+        return JoinOperatorProjection(self._env, self._info).project_second(*fields)
+
+    def using(self, operator, types):
+        """
+        Finalizes a Join transformation.
+
+        Applies a JoinFunction to each pair of joined elements. Each JoinFunction call returns exactly one element.
+
+        :param operator:The JoinFunction that is called for each pair of joined elements.
+        :param types:
+        :return:An Set that represents the joined result DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = JoinFunction()
+            operator.join = f
+        self._info[_Fields.OPERATOR] = operator
+        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        self._info[_Fields.TYPES] = types
+        self._info[_Fields.NAME] = "PythonJoin"
+        self._env._sets.append(self._info)
+        return OperatorSet(self._env, self._info, copy_set=True)
+
+
+class CrossOperatorProjection(DataSet):
+    def __init__(self, env, info):
+        super(CrossOperatorProjection, self).__init__(env, info)
+
+    def project_first(self, *fields):
+        """
+        Initiates a ProjectCross transformation.
+
+        Projects the first join input.
+        If the first join input is a Tuple DataSet, fields can be selected by their index.
+        If the first join input is not a Tuple DataSet, no parameters should be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete CrossProjection.
+        """
+        self._info[_Fields.PROJECTIONS].append(("first", fields))
+        return self
+
+    def project_second(self, *fields):
+        """
+        Initiates a ProjectCross transformation.
+
+        Projects the second join input.
+        If the second join input is a Tuple DataSet, fields can be selected by their index.
+        If the second join input is not a Tuple DataSet, no parameters should be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete CrossProjection.
+        """
+        self._info[_Fields.PROJECTIONS].append(("second", fields))
+        return self
+
+
+class CrossOperator(DataSet):
+    def __init__(self, env, info):
+        super(CrossOperator, self).__init__(env, info)
+        info[_Fields.TYPES] = None
+
+    def project_first(self, *fields):
+        """
+        Initiates a ProjectCross transformation.
+
+        Projects the first join input.
+        If the first join input is a Tuple DataSet, fields can be selected by their index.
+        If the first join input is not a Tuple DataSet, no parameters should be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete CrossProjection.
+        """
+        return CrossOperatorProjection(self._env, self._info).project_first(*fields)
+
+    def project_second(self, *fields):
+        """
+        Initiates a ProjectCross transformation.
+
+        Projects the second join input.
+        If the second join input is a Tuple DataSet, fields can be selected by their index.
+        If the second join input is not a Tuple DataSet, no parameters should be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete CrossProjection.
+        """
+        return CrossOperatorProjection(self._env, self._info).project_second(*fields)
+
+    def using(self, operator, types):
+        """
+        Finalizes a Cross transformation.
+
+        Applies a CrossFunction to each pair of joined elements. Each CrossFunction call returns exactly one element.
+
+        :param operator:The CrossFunction that is called for each pair of joined elements.
+        :param types: The type of the resulting DataSet.
+        :return:An Set that represents the joined result DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = CrossFunction()
+            operator.cross = f
+        self._info[_Fields.OPERATOR] = operator
+        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        self._info[_Fields.TYPES] = types
+        self._info[_Fields.NAME] = "PythonCross"
+        return OperatorSet(self._env, self._info, copy_set=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
new file mode 100644
index 0000000..236eda4
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -0,0 +1,345 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.connection import Connection
+from flink.connection import Collector
+from flink.plan.DataSet import DataSet
+from flink.plan.Constants import _Fields, _Identifier
+from flink.utilities import Switch
+import copy
+import sys
+
+
+def get_environment():
+    """
+    Creates an execution environment that represents the context in which the program is currently executed.
+    
+    :return:The execution environment of the context in which the program is executed.
+    """
+    return Environment()
+
+
+class Environment(object):
+    def __init__(self):
+        # util
+        self._counter = 0
+
+        #parameters
+        self._parameters = []
+
+        #sets
+        self._sources = []
+        self._sets = []
+        self._sinks = []
+
+        #specials
+        self._broadcast = []
+
+    def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
+        """
+        Create a DataSet that represents the tuples produced by reading the given CSV file.
+
+        :param path: The path of the CSV file.
+        :param types: Specifies the types for the CSV fields.
+        :return:A CsvReader that can be used to configure the CSV input.
+        """
+        child = dict()
+        child_set = DataSet(self, child)
+        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_CSV
+        child[_Fields.DELIMITER_LINE] = line_delimiter
+        child[_Fields.DELIMITER_FIELD] = field_delimiter
+        child[_Fields.PATH] = path
+        child[_Fields.TYPES] = types
+        self._sources.append(child)
+        return child_set
+
+    def read_text(self, path):
+        """
+        Creates a DataSet that represents the Strings produced by reading the given file line wise.
+
+        The file will be read with the system's default character set.
+
+        :param path: The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+        :return: A DataSet that represents the data read from the given file as text lines.
+        """
+        child = dict()
+        child_set = DataSet(self, child)
+        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_TEXT
+        child[_Fields.PATH] = path
+        self._sources.append(child)
+        return child_set
+
+    def from_elements(self, *elements):
+        """
+        Creates a new data set that contains the given elements.
+
+        The elements must all be of the same type, for example, all of the String or Integer.
+        The sequence of elements must not be empty.
+
+        :param elements: The elements to make up the data set.
+        :return: A DataSet representing the given list of elements.
+        """
+        child = dict()
+        child_set = DataSet(self, child)
+        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_VALUE
+        child[_Fields.VALUES] = elements
+        self._sources.append(child)
+        return child_set
+
+    def set_degree_of_parallelism(self, degree):
+        """
+        Sets the degree of parallelism (DOP) for operations executed through this environment.
+
+        Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with x parallel instances.
+
+        :param degreeOfParallelism: The degree of parallelism
+        """
+        self._parameters.append(("dop", degree))
+
+    def execute(self, local=False, debug=False):
+        """
+        Triggers the program execution.
+
+        The environment will execute all parts of the program that have resulted in a "sink" operation.
+        """
+        if debug:
+            local = True
+        self._parameters.append(("mode", local))
+        self._parameters.append(("debug", debug))
+        self._optimize_plan()
+
+        plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
+
+        if plan_mode:
+            output_path = sys.stdin.readline().rstrip('\n')
+            self._connection = Connection.OneWayBusyBufferingMappedFileConnection(output_path)
+            self._collector = Collector.TypedCollector(self._connection)
+            self._send_plan()
+            self._connection._write_buffer()
+        else:
+            import struct
+            operator = None
+            try:
+                port = int(sys.stdin.readline().rstrip('\n'))
+
+                id = int(sys.stdin.readline().rstrip('\n'))
+                input_path = sys.stdin.readline().rstrip('\n')
+                output_path = sys.stdin.readline().rstrip('\n')
+
+                operator = None
+                for set in self._sets:
+                    if set[_Fields.ID] == id:
+                        operator = set[_Fields.OPERATOR]
+                    if set[_Fields.ID] == -id:
+                        operator = set[_Fields.COMBINEOP]
+                operator._configure(input_path, output_path, port)
+                operator._go()
+                sys.stdout.flush()
+                sys.stderr.flush()
+            except:
+                sys.stdout.flush()
+                sys.stderr.flush()
+                if operator is not None:
+                    operator._connection._socket.send(struct.pack(">i", -2))
+                raise
+
+    def _optimize_plan(self):
+        self._find_chains()
+
+    def _find_chains(self):
+        udf = set([_Identifier.MAP, _Identifier.FLATMAP, _Identifier.FILTER, _Identifier.MAPPARTITION,
+                   _Identifier.GROUPREDUCE, _Identifier.REDUCE, _Identifier.COGROUP,
+                   _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST,
+                   _Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT])
+        chainable = set([_Identifier.MAP, _Identifier.FILTER, _Identifier.FLATMAP, _Identifier.GROUPREDUCE, _Identifier.REDUCE])
+        multi_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
+        x = len(self._sets) - 1
+        while x > -1:
+            child = self._sets[x]
+            child_type = child[_Fields.IDENTIFIER]
+            if child_type in chainable:
+                parent = child[_Fields.PARENT]
+                parent_type = parent[_Fields.IDENTIFIER]
+                if len(parent[_Fields.SINKS]) == 0:
+                    if child_type == _Identifier.GROUPREDUCE or child_type == _Identifier.REDUCE:
+                        if child[_Fields.COMBINE]:
+                            while parent_type == _Identifier.GROUP or parent_type == _Identifier.SORT:
+                                parent = parent[_Fields.PARENT]
+                                parent_type = parent[_Fields.IDENTIFIER]
+                            if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
+                                if parent[_Fields.OPERATOR] is not None:
+                                    function = child[_Fields.COMBINEOP]
+                                    parent[_Fields.OPERATOR]._chain(function)
+                                    child[_Fields.COMBINE] = False
+                                    parent[_Fields.NAME] += " -> PythonCombine"
+                                    for bcvar in child[_Fields.BCVARS]:
+                                        bcvar_copy = copy.deepcopy(bcvar)
+                                        bcvar_copy[_Fields.PARENT] = parent
+                                        self._broadcast.append(bcvar_copy)
+                    else:
+                        if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
+                            parent_op = parent[_Fields.OPERATOR]
+                            if parent_op is not None:
+                                function = child[_Fields.OPERATOR]
+                                parent_op._chain(function)
+                                parent[_Fields.NAME] += " -> " + child[_Fields.NAME]
+                                parent[_Fields.TYPES] = child[_Fields.TYPES]
+                                for grand_child in child[_Fields.CHILDREN]:
+                                    if grand_child[_Fields.IDENTIFIER] in multi_input:
+                                        if grand_child[_Fields.PARENT][_Fields.ID] == child[_Fields.ID]:
+                                            grand_child[_Fields.PARENT] = parent
+                                        else:
+                                            grand_child[_Fields.OTHER] = parent
+                                    else:
+                                        grand_child[_Fields.PARENT] = parent
+                                        parent[_Fields.CHILDREN].append(grand_child)
+                                parent[_Fields.CHILDREN].remove(child)
+                                for sink in child[_Fields.SINKS]:
+                                    sink[_Fields.PARENT] = parent
+                                    parent[_Fields.SINKS].append(sink)
+                                for bcvar in child[_Fields.BCVARS]:
+                                    bcvar[_Fields.PARENT] = parent
+                                    parent[_Fields.BCVARS].append(bcvar)
+                                self._remove_set((child))
+            x -= 1
+
+    def _remove_set(self, set):
+        self._sets[:] = [s for s in self._sets if s[_Fields.ID]!=set[_Fields.ID]]
+
+    def _send_plan(self):
+        self._send_parameters()
+        self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) + len(self._broadcast))
+        self._send_sources()
+        self._send_operations()
+        self._send_sinks()
+        self._send_broadcast()
+
+    def _send_parameters(self):
+        self._collector.collect(len(self._parameters))
+        for parameter in self._parameters:
+            self._collector.collect(parameter)
+
+    def _send_sources(self):
+        for source in self._sources:
+            identifier = source[_Fields.IDENTIFIER]
+            collect = self._collector.collect
+            collect(identifier)
+            collect(source[_Fields.ID])
+            for case in Switch(identifier):
+                if case(_Identifier.SOURCE_CSV):
+                    collect(source[_Fields.PATH])
+                    collect(source[_Fields.DELIMITER_FIELD])
+                    collect(source[_Fields.DELIMITER_LINE])
+                    collect(source[_Fields.TYPES])
+                    break
+                if case(_Identifier.SOURCE_TEXT):
+                    collect(source[_Fields.PATH])
+                    break
+                if case(_Identifier.SOURCE_VALUE):
+                    collect(len(source[_Fields.VALUES]))
+                    for value in source[_Fields.VALUES]:
+                        collect(value)
+                    break
+
+    def _send_operations(self):
+        collect = self._collector.collect
+        for set in self._sets:
+            identifier = set.get(_Fields.IDENTIFIER)
+            collect(set[_Fields.IDENTIFIER])
+            collect(set[_Fields.ID])
+            collect(set[_Fields.PARENT][_Fields.ID])
+            for case in Switch(identifier):
+                if case(_Identifier.SORT):
+                    collect(set[_Fields.FIELD])
+                    collect(set[_Fields.ORDER])
+                    break
+                if case(_Identifier.GROUP):
+                    collect(set[_Fields.KEYS])
+                    break
+                if case(_Identifier.COGROUP):
+                    collect(set[_Fields.OTHER][_Fields.ID])
+                    collect(set[_Fields.KEY1])
+                    collect(set[_Fields.KEY2])
+                    collect(set[_Fields.TYPES])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST):
+                    collect(set[_Fields.OTHER][_Fields.ID])
+                    collect(set[_Fields.TYPES])
+                    collect(len(set[_Fields.PROJECTIONS]))
+                    for p in set[_Fields.PROJECTIONS]:
+                        collect(p[0])
+                        collect(p[1])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
+                    collect(set[_Fields.TYPES])
+                    collect(set[_Fields.COMBINE])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT):
+                    collect(set[_Fields.KEY1])
+                    collect(set[_Fields.KEY2])
+                    collect(set[_Fields.OTHER][_Fields.ID])
+                    collect(set[_Fields.TYPES])
+                    collect(len(set[_Fields.PROJECTIONS]))
+                    for p in set[_Fields.PROJECTIONS]:
+                        collect(p[0])
+                        collect(p[1])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER):
+                    collect(set[_Fields.TYPES])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.UNION):
+                    collect(set[_Fields.OTHER][_Fields.ID])
+                    break
+                if case(_Identifier.PROJECTION):
+                    collect(set[_Fields.KEYS])
+                    break
+                if case():
+                    raise KeyError("Environment._send_child_sets(): Invalid operation identifier: " + str(identifier))
+
+    def _send_sinks(self):
+        for sink in self._sinks:
+            identifier = sink[_Fields.IDENTIFIER]
+            collect = self._collector.collect
+            collect(identifier)
+            collect(sink[_Fields.PARENT][_Fields.ID])
+            for case in Switch(identifier):
+                if case(_Identifier.SINK_CSV):
+                    collect(sink[_Fields.PATH])
+                    collect(sink[_Fields.DELIMITER_FIELD])
+                    collect(sink[_Fields.DELIMITER_LINE])
+                    collect(sink[_Fields.WRITE_MODE])
+                    break;
+                if case(_Identifier.SINK_TEXT):
+                    collect(sink[_Fields.PATH])
+                    collect(sink[_Fields.WRITE_MODE])
+                    break
+                if case(_Identifier.SINK_PRINT):
+                    collect(sink[_Fields.TO_ERR])
+                    break
+
+    def _send_broadcast(self):
+        collect = self._collector.collect
+        for entry in self._broadcast:
+            collect(_Identifier.BROADCAST)
+            collect(entry[_Fields.PARENT][_Fields.ID])
+            collect(entry[_Fields.OTHER][_Fields.ID])
+            collect(entry[_Fields.NAME])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/__init__.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/__init__.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/utilities/__init__.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/utilities/__init__.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/utilities/__init__.py
new file mode 100644
index 0000000..faae78a
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/utilities/__init__.py
@@ -0,0 +1,36 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class Switch(object):
+    def __init__(self, value):
+        self.value = value
+        self.fall = False
+
+    def __iter__(self):
+        yield self.match
+        raise StopIteration
+
+    def match(self, *args):
+        if self.fall or not args:
+            return True
+        elif self.value in args:
+            self.fall = True
+            return True
+        else:
+            return False

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/setup.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/setup.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/setup.py
new file mode 100644
index 0000000..82447e9
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/setup.py
@@ -0,0 +1,33 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from distutils.core import setup
+
+setup(
+    name='flink',
+    version='1.0',
+    packages=['flink',
+              'flink.connection',
+              'flink.functions',
+              'flink.plan',
+              'flink.utilities'],
+    url='http://flink.apache.org',
+    license='Licensed under the Apache License, Version 2.0',
+    author='',
+    author_email='',
+    description='Flink Python API'
+)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
new file mode 100644
index 0000000..697731f
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_2;
+import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_3;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PythonPlanBinderTest {
+	private static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class);
+	
+	private static boolean python2Supported = true;
+	private static boolean python3Supported = true;
+	private static List<String> TEST_FILES;
+	
+	@BeforeClass
+	public static void setup() throws Exception {
+		findTestFiles();
+		checkPythonSupport();
+	}
+	
+	private static void findTestFiles() throws Exception {
+		TEST_FILES = new ArrayList();
+		FileSystem fs = FileSystem.getLocalFileSystem();
+		FileStatus[] status = fs.listStatus(
+				new Path(fs.getWorkingDirectory().toString()
+						+ "/src/test/python/org/apache/flink/python/api"));
+		for (FileStatus f : status) {
+			String file = f.getPath().toString();
+			if (file.endsWith(".py")) {
+				TEST_FILES.add(file);
+			}
+		}
+	}
+	
+	private static void checkPythonSupport() {	
+		try {
+			Runtime.getRuntime().exec("python");
+		} catch (IOException ex) {
+			python2Supported = false;
+			LOG.info("No Python 2 runtime detected.");
+		}
+		try {
+			Runtime.getRuntime().exec("python3");
+		} catch (IOException ex) {
+			python3Supported = false;
+			LOG.info("No Python 3 runtime detected.");
+		}
+	}
+	
+	@Test
+	public void testPython2() throws Exception {
+		if (python2Supported) {
+			for (String file : TEST_FILES) {
+				LOG.info("testing " + file);
+				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file});
+			}
+		}
+	}
+	
+	@Test
+	public void testPython3() throws Exception {
+		if (python3Supported) {
+			for (String file : TEST_FILES) {
+				LOG.info("testing " + file);
+				PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file});
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv
new file mode 100644
index 0000000..a103a5c
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv
@@ -0,0 +1,2 @@
+4,2,hello
+3,2,world

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text
new file mode 100644
index 0000000..e7be084
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text
@@ -0,0 +1,2 @@
+sup guys
+i am the world

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py
new file mode 100644
index 0000000..62b6a1d
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py
@@ -0,0 +1,31 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+from flink.plan.Constants import WriteMode
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))
+
+    d1.write_csv("/tmp/flink/result", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)