You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/18 12:27:26 UTC

flink git commit: [FLINK-2441] Introduce Python OperationInfo

Repository: flink
Updated Branches:
  refs/heads/master 99d52cc69 -> 5d37fe146


[FLINK-2441] Introduce Python OperationInfo

This closes #1352.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d37fe14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d37fe14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d37fe14

Branch: refs/heads/master
Commit: 5d37fe146d57f6d661f7fc9b86c66a6e8a63ab0b
Parents: 99d52cc
Author: zentol <ch...@apache.org>
Authored: Fri Jul 24 20:18:47 2015 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Nov 18 12:26:52 2015 +0100

----------------------------------------------------------------------
 .../flink/python/api/flink/plan/Constants.py    |  28 --
 .../flink/python/api/flink/plan/DataSet.py      | 406 +++++++++----------
 .../flink/python/api/flink/plan/Environment.py  | 209 +++++-----
 .../python/api/flink/plan/OperationInfo.py      |  50 +++
 .../flink/python/api/test_type_deduction.py     |  15 +-
 5 files changed, 357 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
index b0d79e8..0c9fe80 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -47,34 +47,6 @@ class _Identifier(object):
     BROADCAST = "broadcast"
 
 
-class _Fields(object):
-    PARENT = "parent"
-    OTHER = "other_set"
-    SINKS = "sinks"
-    IDENTIFIER = "identifier"
-    FIELD = "field"
-    ORDER = "order"
-    KEYS = "keys"
-    KEY1 = "key1"
-    KEY2 = "key2"
-    TYPES = "types"
-    OPERATOR = "operator"
-    META = "meta"
-    NAME = "name"
-    COMBINE = "combine"
-    DELIMITER_LINE = "del_l"
-    DELIMITER_FIELD = "del_f"
-    WRITE_MODE = "write"
-    PATH = "path"
-    VALUES = "values"
-    COMBINEOP = "combineop"
-    CHILDREN = "children"
-    BCVARS = "bcvars"
-    PROJECTIONS = "projections"
-    ID = "id"
-    TO_ERR = "to_error"
-
-
 class WriteMode(object):
     NO_OVERWRITE = 0
     OVERWRITE = 1

http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 390a08d..7ef5488 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -19,7 +19,8 @@ import inspect
 import copy
 import types as TYPES
 
-from flink.plan.Constants import _Fields, _Identifier, WriteMode, STRING
+from flink.plan.Constants import _Identifier, WriteMode, STRING
+from flink.plan.OperationInfo import OperationInfo
 from flink.functions.CoGroupFunction import CoGroupFunction
 from flink.functions.FilterFunction import FilterFunction
 from flink.functions.FlatMapFunction import FlatMapFunction
@@ -36,29 +37,29 @@ def deduct_output_type(dataset):
     default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN])
 
     while True:
-        dataset_type = dataset[_Fields.IDENTIFIER]
+        dataset_type = dataset.identifier
         if dataset_type in skip:
-            dataset = dataset[_Fields.PARENT]
+            dataset = dataset.parent
             continue
         if dataset_type in source:
             if dataset_type == _Identifier.SOURCE_TEXT:
                 return STRING
             if dataset_type == _Identifier.SOURCE_VALUE:
-                return dataset[_Fields.VALUES][0]
+                return dataset.values[0]
             if dataset_type == _Identifier.SOURCE_CSV:
-                return dataset[_Fields.TYPES]
+                return dataset.types
         if dataset_type == _Identifier.PROJECTION:
-            return tuple([deduct_output_type(dataset[_Fields.PARENT])[k] for k in dataset[_Fields.KEYS]])
+            return tuple([deduct_output_type(dataset.parent)[k] for k in dataset.keys])
         if dataset_type in default:
-            if dataset[_Fields.OPERATOR] is not None: #udf-join/cross
-                return dataset[_Fields.TYPES]
-            if len(dataset[_Fields.PROJECTIONS]) == 0: #defaultjoin/-cross
-                return (deduct_output_type(dataset[_Fields.PARENT]), deduct_output_type(dataset[_Fields.OTHER]))
+            if dataset.operator is not None: #udf-join/cross
+                return dataset.types
+            if len(dataset.projections) == 0: #defaultjoin/-cross
+                return (deduct_output_type(dataset.parent), deduct_output_type(dataset.other))
             else: #projectjoin/-cross
-                t1 = deduct_output_type(dataset[_Fields.PARENT])
-                t2 = deduct_output_type(dataset[_Fields.OTHER])
+                t1 = deduct_output_type(dataset.parent)
+                t2 = deduct_output_type(dataset.other)
                 out_type = []
-                for prj in dataset[_Fields.PROJECTIONS]:
+                for prj in dataset.projections:
                     if len(prj[1]) == 0: #projection on non-tuple dataset
                         if prj[0] == "first":
                             out_type.append(t1)
@@ -71,30 +72,25 @@ def deduct_output_type(dataset):
                             else:
                                 out_type.append(t2[key])
                 return tuple(out_type)
-        return dataset[_Fields.TYPES]
+        return dataset.types
 
 
 class Set(object):
-    def __init__(self, env, info, copy_set=False):
+    def __init__(self, env, info):
         self._env = env
         self._info = info
-        if not copy_set:
-            self._info[_Fields.ID] = env._counter
-            self._info[_Fields.BCVARS] = []
-            self._info[_Fields.CHILDREN] = []
-            self._info[_Fields.SINKS] = []
-            self._info[_Fields.NAME] = None
-            env._counter += 1
+        info.id = env._counter
+        env._counter += 1
 
     def output(self, to_error=False):
         """
         Writes a DataSet to the standard output stream (stdout).
         """
-        child = dict()
-        child[_Fields.IDENTIFIER] = _Identifier.SINK_PRINT
-        child[_Fields.PARENT] = self._info
-        child[_Fields.TO_ERR] = to_error
-        self._info[_Fields.SINKS].append(child)
+        child = OperationInfo()
+        child.identifier = _Identifier.SINK_PRINT
+        child.parent = self._info
+        child.to_err = to_error
+        self._info.sinks.append(child)
         self._env._sinks.append(child)
 
     def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
@@ -104,12 +100,12 @@ class Set(object):
         :param path: he path pointing to the location the text file is written to.
         :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
         """
-        child = dict()
-        child[_Fields.IDENTIFIER] = _Identifier.SINK_TEXT
-        child[_Fields.PARENT] = self._info
-        child[_Fields.PATH] = path
-        child[_Fields.WRITE_MODE] = write_mode
-        self._info[_Fields.SINKS].append(child)
+        child = OperationInfo()
+        child.identifier = _Identifier.SINK_TEXT
+        child.parent = self._info
+        child.path = path
+        child.write_mode = write_mode
+        self._info.sinks.append(child)
         self._env._sinks.append(child)
 
     def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
@@ -120,14 +116,14 @@ class Set(object):
         :param path: The path pointing to the location the CSV file is written to.
         :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
         """
-        child = dict()
-        child[_Fields.IDENTIFIER] = _Identifier.SINK_CSV
-        child[_Fields.PATH] = path
-        child[_Fields.PARENT] = self._info
-        child[_Fields.DELIMITER_FIELD] = field_delimiter
-        child[_Fields.DELIMITER_LINE] = line_delimiter
-        child[_Fields.WRITE_MODE] = write_mode
-        self._info[_Fields.SINKS].append(child)
+        child = OperationInfo()
+        child.identifier = _Identifier.SINK_CSV
+        child.path = path
+        child.parent = self._info
+        child.delimiter_field = field_delimiter
+        child.delimiter_line = line_delimiter
+        child.write_mode = write_mode
+        self._info.sinks.append(child)
         self._env._sinks.append(child)
 
     def reduce_group(self, operator, types, combinable=False):
@@ -147,28 +143,26 @@ class Set(object):
             f = operator
             operator = GroupReduceFunction()
             operator.reduce = f
-        child = dict()
+        child = OperationInfo()
         child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = copy.deepcopy(operator)
-        child[_Fields.OPERATOR]._combine = False
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.COMBINE] = combinable
-        child[_Fields.COMBINEOP] = operator
-        child[_Fields.COMBINEOP]._combine = True
-        child[_Fields.NAME] = "PythonGroupReduce"
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.GROUPREDUCE
+        child.parent = self._info
+        child.operator = copy.deepcopy(operator)
+        child.operator._combine = False
+        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.types = types
+        child.combine = combinable
+        child.combineop = operator
+        child.combineop._combine = True
+        child.name = "PythonGroupReduce"
+        self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
 
 class ReduceSet(Set):
-    def __init__(self, env, info, copy_set=False):
-        super(ReduceSet, self).__init__(env, info, copy_set)
-        if not copy_set:
-            self._is_chained = False
+    def __init__(self, env, info):
+        super(ReduceSet, self).__init__(env, info)
 
     def reduce(self, operator):
         """
@@ -184,24 +178,24 @@ class ReduceSet(Set):
             f = operator
             operator = ReduceFunction()
             operator.reduce = f
-        child = dict()
+        child = OperationInfo()
         child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.REDUCE
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.COMBINEOP] = operator
-        child[_Fields.COMBINE] = False
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.NAME] = "PythonReduce"
-        child[_Fields.TYPES] = deduct_output_type(self._info)
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.REDUCE
+        child.parent = self._info
+        child.operator = operator
+        child.combineop = operator
+        child.combine = False
+        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.name = "PythonReduce"
+        child.types = deduct_output_type(self._info)
+        self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
 
 class DataSet(ReduceSet):
-    def __init__(self, env, info, copy_set=False):
-        super(DataSet, self).__init__(env, info, copy_set)
+    def __init__(self, env, info):
+        super(DataSet, self).__init__(env, info)
 
     def project(self, *fields):
         """
@@ -215,12 +209,12 @@ class DataSet(ReduceSet):
         :return: The projected DataSet.
 
         """
-        child = dict()
+        child = OperationInfo()
         child_set = DataSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.PROJECTION
-        child[_Fields.PARENT] = self._info
-        child[_Fields.KEYS] = fields
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.PROJECTION
+        child.parent = self._info
+        child.keys = fields
+        self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
@@ -237,14 +231,14 @@ class DataSet(ReduceSet):
         :param keys: One or more field positions on which the DataSet will be grouped.
         :return:A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
         """
-        child = dict()
+        child = OperationInfo()
         child_chain = []
         child_set = UnsortedGrouping(self._env, child, child_chain)
-        child[_Fields.IDENTIFIER] = _Identifier.GROUP
-        child[_Fields.PARENT] = self._info
-        child[_Fields.KEYS] = keys
+        child.identifier = _Identifier.GROUP
+        child.parent = self._info
+        child.keys = keys
         child_chain.append(child)
-        self._info[_Fields.CHILDREN].append(child)
+        self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
@@ -261,13 +255,13 @@ class DataSet(ReduceSet):
         :param other_set: The other DataSet of the CoGroup transformation.
         :return:A CoGroupOperator to continue the definition of the CoGroup transformation.
         """
-        child = dict()
-        other_set._info[_Fields.CHILDREN].append(child)
+        child = OperationInfo()
+        other_set._info.children.append(child)
         child_set = CoGroupOperatorWhere(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.COGROUP
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = other_set._info
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.COGROUP
+        child.parent = self._info
+        child.other = other_set._info
+        self._info.children.append(child)
         return child_set
 
     def cross(self, other_set):
@@ -308,16 +302,13 @@ class DataSet(ReduceSet):
         return self._cross(other_set, _Identifier.CROSST)
 
     def _cross(self, other_set, identifier):
-        child = dict()
+        child = OperationInfo()
         child_set = CrossOperator(self._env, child)
-        child[_Fields.IDENTIFIER] = identifier
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = other_set._info
-        child[_Fields.PROJECTIONS] = []
-        child[_Fields.OPERATOR] = None
-        child[_Fields.META] = None
-        self._info[_Fields.CHILDREN].append(child)
-        other_set._info[_Fields.CHILDREN].append(child)
+        child.identifier = identifier
+        child.parent = self._info
+        child.other = other_set._info
+        self._info.children.append(child)
+        other_set._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
@@ -335,15 +326,15 @@ class DataSet(ReduceSet):
             f = operator
             operator = FilterFunction()
             operator.filter = f
-        child = dict()
+        child = OperationInfo()
         child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.FILTER
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.NAME] = "PythonFilter"
-        child[_Fields.TYPES] = deduct_output_type(self._info)
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.FILTER
+        child.parent = self._info
+        child.operator = operator
+        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.name = "PythonFilter"
+        child.types = deduct_output_type(self._info)
+        self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
@@ -362,15 +353,15 @@ class DataSet(ReduceSet):
             f = operator
             operator = FlatMapFunction()
             operator.flat_map = f
-        child = dict()
+        child = OperationInfo()
         child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.FLATMAP
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.NAME] = "PythonFlatMap"
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.FLATMAP
+        child.parent = self._info
+        child.operator = operator
+        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.types = types
+        child.name = "PythonFlatMap"
+        self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
@@ -412,16 +403,13 @@ class DataSet(ReduceSet):
         return self._join(other_set, _Identifier.JOINT)
 
     def _join(self, other_set, identifier):
-        child = dict()
+        child = OperationInfo()
         child_set = JoinOperatorWhere(self._env, child)
-        child[_Fields.IDENTIFIER] = identifier
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = other_set._info
-        child[_Fields.OPERATOR] = None
-        child[_Fields.META] = None
-        child[_Fields.PROJECTIONS] = []
-        self._info[_Fields.CHILDREN].append(child)
-        other_set._info[_Fields.CHILDREN].append(child)
+        child.identifier = identifier
+        child.parent = self._info
+        child.other = other_set._info
+        self._info.children.append(child)
+        other_set._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
@@ -440,15 +428,15 @@ class DataSet(ReduceSet):
             f = operator
             operator = MapFunction()
             operator.map = f
-        child = dict()
+        child = OperationInfo()
         child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.MAP
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.NAME] = "PythonMap"
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.MAP
+        child.parent = self._info
+        child.operator = operator
+        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.types = types
+        child.name = "PythonMap"
+        self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
@@ -471,15 +459,15 @@ class DataSet(ReduceSet):
             f = operator
             operator = MapPartitionFunction()
             operator.map_partition = f
-        child = dict()
+        child = OperationInfo()
         child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.MAPPARTITION
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = operator
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.NAME] = "PythonMapPartition"
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.MAPPARTITION
+        child.parent = self._info
+        child.operator = operator
+        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.types = types
+        child.name = "PythonMapPartition"
+        self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
@@ -492,28 +480,28 @@ class DataSet(ReduceSet):
         :param other_set: The other DataSet which is unioned with the current DataSet.
         :return:The resulting DataSet.
         """
-        child = dict()
+        child = OperationInfo()
         child_set = DataSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.UNION
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = other_set._info
-        self._info[_Fields.CHILDREN].append(child)
-        other_set._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.UNION
+        child.parent = self._info
+        child.other = other_set._info
+        self._info.children.append(child)
+        other_set._info.children.append(child)
         self._env._sets.append(child)
         return child_set
 
 
 class OperatorSet(DataSet):
-    def __init__(self, env, info, copy_set=False):
-        super(OperatorSet, self).__init__(env, info, copy_set)
+    def __init__(self, env, info):
+        super(OperatorSet, self).__init__(env, info)
 
     def with_broadcast_set(self, name, set):
-        child = dict()
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OTHER] = set._info
-        child[_Fields.NAME] = name
-        self._info[_Fields.BCVARS].append(child)
-        set._info[_Fields.CHILDREN].append(child)
+        child = OperationInfo()
+        child.parent = self._info
+        child.other = set._info
+        child.name = name
+        self._info.bcvars.append(child)
+        set._info.children.append(child)
         self._env._broadcast.append(child)
         return self
 
@@ -523,9 +511,7 @@ class Grouping(object):
         self._env = env
         self._child_chain = child_chain
         self._info = info
-        info[_Fields.ID] = env._counter
-        info[_Fields.CHILDREN] = []
-        info[_Fields.SINKS] = []
+        info.id = env._counter
         env._counter += 1
 
     def reduce_group(self, operator, types, combinable=False):
@@ -545,21 +531,21 @@ class Grouping(object):
             f = operator
             operator = GroupReduceFunction()
             operator.reduce = f
-        operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
-        operator._set_sort_ops([(x[_Fields.FIELD], x[_Fields.ORDER]) for x in self._child_chain[1:]])
-        child = dict()
+        operator._set_grouping_keys(self._child_chain[0].keys)
+        operator._set_sort_ops([(x.field, x.order) for x in self._child_chain[1:]])
+        child = OperationInfo()
         child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = copy.deepcopy(operator)
-        child[_Fields.OPERATOR]._combine = False
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.TYPES] = types
-        child[_Fields.COMBINE] = combinable
-        child[_Fields.COMBINEOP] = operator
-        child[_Fields.COMBINEOP]._combine = True
-        child[_Fields.NAME] = "PythonGroupReduce"
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.GROUPREDUCE
+        child.parent = self._info
+        child.operator = copy.deepcopy(operator)
+        child.operator._combine = False
+        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.types = types
+        child.combine = combinable
+        child.combineop = operator
+        child.combineop._combine = True
+        child.name = "PythonGroupReduce"
+        self._info.children.append(child)
         self._env._sets.append(child)
 
         return child_set
@@ -575,13 +561,13 @@ class Grouping(object):
         :param order: The Order in which the specified Tuple field is sorted. See DataSet.Order.
         :return:A SortedGrouping with specified order of group element.
         """
-        child = dict()
+        child = OperationInfo()
         child_set = SortedGrouping(self._env, child, self._child_chain)
-        child[_Fields.IDENTIFIER] = _Identifier.SORT
-        child[_Fields.PARENT] = self._info
-        child[_Fields.FIELD] = field
-        child[_Fields.ORDER] = order
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.SORT
+        child.parent = self._info
+        child.field = field
+        child.order = order
+        self._info.children.append(child)
         self._child_chain.append(child)
         self._env._sets.append(child)
         return child_set
@@ -601,22 +587,22 @@ class UnsortedGrouping(Grouping):
         :param operator:The ReduceFunction that is applied on the DataSet.
         :return:A ReduceOperator that represents the reduced DataSet.
         """
-        operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
+        operator._set_grouping_keys(self._child_chain[0].keys)
         for i in self._child_chain:
             self._env._sets.append(i)
-        child = dict()
+        child = OperationInfo()
         child_set = OperatorSet(self._env, child)
-        child[_Fields.IDENTIFIER] = _Identifier.REDUCE
-        child[_Fields.PARENT] = self._info
-        child[_Fields.OPERATOR] = copy.deepcopy(operator)
-        child[_Fields.OPERATOR]._combine = False
-        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child[_Fields.COMBINE] = True
-        child[_Fields.COMBINEOP] = operator
-        child[_Fields.COMBINEOP]._combine = True
-        child[_Fields.NAME] = "PythonReduce"
-        child[_Fields.TYPES] = deduct_output_type(self._info)
-        self._info[_Fields.CHILDREN].append(child)
+        child.identifier = _Identifier.REDUCE
+        child.parent = self._info
+        child.operator = copy.deepcopy(operator)
+        child.operator._combine = False
+        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.combine = True
+        child.combineop = operator
+        child.combineop._combine = True
+        child.name = "PythonReduce"
+        child.types = deduct_output_type(self._info)
+        self._info.children.append(child)
         self._env._sets.append(child)
 
         return child_set
@@ -642,7 +628,7 @@ class CoGroupOperatorWhere(object):
         :param fields: The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys.
         :return: An incomplete CoGroup transformation.
         """
-        self._info[_Fields.KEY1] = fields
+        self._info.key1 = fields
         return CoGroupOperatorTo(self._env, self._info)
 
 
@@ -661,7 +647,7 @@ class CoGroupOperatorTo(object):
         :param fields: The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys.
         :return: An incomplete CoGroup transformation.
         """
-        self._info[_Fields.KEY2] = fields
+        self._info.key2 = fields
         return CoGroupOperatorUsing(self._env, self._info)
 
 
@@ -686,12 +672,12 @@ class CoGroupOperatorUsing(object):
             operator = CoGroupFunction()
             operator.co_group = f
         new_set = OperatorSet(self._env, self._info)
-        operator._keys1 = self._info[_Fields.KEY1]
-        operator._keys2 = self._info[_Fields.KEY2]
-        self._info[_Fields.OPERATOR] = operator
-        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info[_Fields.TYPES] = types
-        self._info[_Fields.NAME] = "PythonCoGroup"
+        operator._keys1 = self._info.key1
+        operator._keys2 = self._info.key2
+        self._info.operator = operator
+        self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        self._info.types = types
+        self._info.name = "PythonCoGroup"
         self._env._sets.append(self._info)
         return new_set
 
@@ -712,7 +698,7 @@ class JoinOperatorWhere(object):
         :return:An incomplete Join transformation.
 
         """
-        self._info[_Fields.KEY1] = fields
+        self._info.key1 = fields
         return JoinOperatorTo(self._env, self._info)
 
 
@@ -731,7 +717,7 @@ class JoinOperatorTo(object):
         :param fields:The indexes of the Tuple fields of the second join DataSet that should be used as keys.
         :return:An incomplete Join Transformation.
         """
-        self._info[_Fields.KEY2] = fields
+        self._info.key2 = fields
         return JoinOperator(self._env, self._info)
 
 
@@ -750,7 +736,7 @@ class JoinOperatorProjection(DataSet):
         :param fields: The indexes of the selected fields.
         :return: An incomplete JoinProjection.
         """
-        self._info[_Fields.PROJECTIONS].append(("first", fields))
+        self._info.projections.append(("first", fields))
         return self
 
     def project_second(self, *fields):
@@ -764,14 +750,13 @@ class JoinOperatorProjection(DataSet):
         :param fields: The indexes of the selected fields.
         :return: An incomplete JoinProjection.
         """
-        self._info[_Fields.PROJECTIONS].append(("second", fields))
+        self._info.projections.append(("second", fields))
         return self
 
 
 class JoinOperator(DataSet):
     def __init__(self, env, info):
         super(JoinOperator, self).__init__(env, info)
-        self._info[_Fields.TYPES] = None
 
     def project_first(self, *fields):
         """
@@ -813,12 +798,12 @@ class JoinOperator(DataSet):
             f = operator
             operator = JoinFunction()
             operator.join = f
-        self._info[_Fields.OPERATOR] = operator
-        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info[_Fields.TYPES] = types
-        self._info[_Fields.NAME] = "PythonJoin"
+        self._info.operator = operator
+        self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        self._info.types = types
+        self._info.name = "PythonJoin"
         self._env._sets.append(self._info)
-        return OperatorSet(self._env, self._info, copy_set=True)
+        return OperatorSet(self._env, self._info)
 
 
 class CrossOperatorProjection(DataSet):
@@ -836,7 +821,7 @@ class CrossOperatorProjection(DataSet):
         :param fields: The indexes of the selected fields.
         :return: An incomplete CrossProjection.
         """
-        self._info[_Fields.PROJECTIONS].append(("first", fields))
+        self._info.projections.append(("first", fields))
         return self
 
     def project_second(self, *fields):
@@ -850,14 +835,13 @@ class CrossOperatorProjection(DataSet):
         :param fields: The indexes of the selected fields.
         :return: An incomplete CrossProjection.
         """
-        self._info[_Fields.PROJECTIONS].append(("second", fields))
+        self._info.projections.append(("second", fields))
         return self
 
 
 class CrossOperator(DataSet):
     def __init__(self, env, info):
         super(CrossOperator, self).__init__(env, info)
-        info[_Fields.TYPES] = None
 
     def project_first(self, *fields):
         """
@@ -899,8 +883,8 @@ class CrossOperator(DataSet):
             f = operator
             operator = CrossFunction()
             operator.cross = f
-        self._info[_Fields.OPERATOR] = operator
-        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        self._info[_Fields.TYPES] = types
-        self._info[_Fields.NAME] = "PythonCross"
-        return OperatorSet(self._env, self._info, copy_set=True)
+        self._info.operator = operator
+        self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        self._info.types = types
+        self._info.name = "PythonCross"
+        return OperatorSet(self._env, self._info)

http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 8647686..bea6212 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -18,7 +18,8 @@
 from flink.connection import Connection
 from flink.connection import Collector
 from flink.plan.DataSet import DataSet
-from flink.plan.Constants import _Fields, _Identifier
+from flink.plan.Constants import _Identifier
+from flink.plan.OperationInfo import OperationInfo
 from flink.utilities import Switch
 import copy
 import sys
@@ -70,13 +71,13 @@ class Environment(object):
         :param types: Specifies the types for the CSV fields.
         :return:A CsvReader that can be used to configure the CSV input.
         """
-        child = dict()
+        child = OperationInfo()
         child_set = DataSet(self, child)
-        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_CSV
-        child[_Fields.DELIMITER_LINE] = line_delimiter
-        child[_Fields.DELIMITER_FIELD] = field_delimiter
-        child[_Fields.PATH] = path
-        child[_Fields.TYPES] = types
+        child.identifier = _Identifier.SOURCE_CSV
+        child.delimiter_line = line_delimiter
+        child.delimiter_field = field_delimiter
+        child.path = path
+        child.types = types
         self._sources.append(child)
         return child_set
 
@@ -89,10 +90,10 @@ class Environment(object):
         :param path: The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
         :return: A DataSet that represents the data read from the given file as text lines.
         """
-        child = dict()
+        child = OperationInfo()
         child_set = DataSet(self, child)
-        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_TEXT
-        child[_Fields.PATH] = path
+        child.identifier = _Identifier.SOURCE_TEXT
+        child.path = path
         self._sources.append(child)
         return child_set
 
@@ -106,10 +107,10 @@ class Environment(object):
         :param elements: The elements to make up the data set.
         :return: A DataSet representing the given list of elements.
         """
-        child = dict()
+        child = OperationInfo()
         child_set = DataSet(self, child)
-        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_VALUE
-        child[_Fields.VALUES] = elements
+        child.identifier = _Identifier.SOURCE_VALUE
+        child.values = elements
         self._sources.append(child)
         return child_set
 
@@ -155,10 +156,10 @@ class Environment(object):
 
                 operator = None
                 for set in self._sets:
-                    if set[_Fields.ID] == id:
-                        operator = set[_Fields.OPERATOR]
-                    if set[_Fields.ID] == -id:
-                        operator = set[_Fields.COMBINEOP]
+                    if set.id == id:
+                        operator = set.operator
+                    if set.id == -id:
+                        operator = set.combineop
                 operator._configure(input_path, output_path, port, self)
                 operator._go()
                 sys.stdout.flush()
@@ -183,55 +184,55 @@ class Environment(object):
         x = len(self._sets) - 1
         while x > -1:
             child = self._sets[x]
-            child_type = child[_Fields.IDENTIFIER]
+            child_type = child.identifier
             if child_type in chainable:
-                parent = child[_Fields.PARENT]
-                parent_type = parent[_Fields.IDENTIFIER]
-                if len(parent[_Fields.SINKS]) == 0:
+                parent = child.parent
+                parent_type = parent.identifier
+                if len(parent.sinks) == 0:
                     if child_type == _Identifier.GROUPREDUCE or child_type == _Identifier.REDUCE:
-                        if child[_Fields.COMBINE]:
+                        if child.combine:
                             while parent_type == _Identifier.GROUP or parent_type == _Identifier.SORT:
-                                parent = parent[_Fields.PARENT]
-                                parent_type = parent[_Fields.IDENTIFIER]
-                            if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
-                                if parent[_Fields.OPERATOR] is not None:
-                                    function = child[_Fields.COMBINEOP]
-                                    parent[_Fields.OPERATOR]._chain(function)
-                                    child[_Fields.COMBINE] = False
-                                    parent[_Fields.NAME] += " -> PythonCombine"
-                                    for bcvar in child[_Fields.BCVARS]:
+                                parent = parent.parent
+                                parent_type = parent.identifier
+                            if parent_type in udf and len(parent.children) == 1:
+                                if parent.operator is not None:
+                                    function = child.combineop
+                                    parent.operator._chain(function)
+                                    child.combine = False
+                                    parent.name += " -> PythonCombine"
+                                    for bcvar in child.bcvars:
                                         bcvar_copy = copy.deepcopy(bcvar)
-                                        bcvar_copy[_Fields.PARENT] = parent
+                                        bcvar_copy.parent = parent
                                         self._broadcast.append(bcvar_copy)
                     else:
-                        if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
-                            parent_op = parent[_Fields.OPERATOR]
+                        if parent_type in udf and len(parent.children) == 1:
+                            parent_op = parent.operator
                             if parent_op is not None:
-                                function = child[_Fields.OPERATOR]
+                                function = child.operator
                                 parent_op._chain(function)
-                                parent[_Fields.NAME] += " -> " + child[_Fields.NAME]
-                                parent[_Fields.TYPES] = child[_Fields.TYPES]
-                                for grand_child in child[_Fields.CHILDREN]:
-                                    if grand_child[_Fields.IDENTIFIER] in multi_input:
-                                        if grand_child[_Fields.PARENT][_Fields.ID] == child[_Fields.ID]:
-                                            grand_child[_Fields.PARENT] = parent
+                                parent.name += " -> " + child.name
+                                parent.types = child.types
+                                for grand_child in child.children:
+                                    if grand_child.identifier in multi_input:
+                                        if grand_child.parent.id == child.id:
+                                            grand_child.parent = parent
                                         else:
-                                            grand_child[_Fields.OTHER] = parent
+                                            grand_child.other = parent
                                     else:
-                                        grand_child[_Fields.PARENT] = parent
-                                        parent[_Fields.CHILDREN].append(grand_child)
-                                parent[_Fields.CHILDREN].remove(child)
-                                for sink in child[_Fields.SINKS]:
-                                    sink[_Fields.PARENT] = parent
-                                    parent[_Fields.SINKS].append(sink)
-                                for bcvar in child[_Fields.BCVARS]:
-                                    bcvar[_Fields.PARENT] = parent
-                                    parent[_Fields.BCVARS].append(bcvar)
+                                        grand_child.parent = parent
+                                        parent.children.append(grand_child)
+                                parent.children.remove(child)
+                                for sink in child.sinks:
+                                    sink.parent = parent
+                                    parent.sinks.append(sink)
+                                for bcvar in child.bcvars:
+                                    bcvar.parent = parent
+                                    parent.bcvars.append(bcvar)
                                 self._remove_set((child))
             x -= 1
 
     def _remove_set(self, set):
-        self._sets[:] = [s for s in self._sets if s[_Fields.ID]!=set[_Fields.ID]]
+        self._sets[:] = [s for s in self._sets if s.id!=set.id]
 
     def _send_plan(self):
         self._send_parameters()
@@ -248,111 +249,111 @@ class Environment(object):
 
     def _send_sources(self):
         for source in self._sources:
-            identifier = source[_Fields.IDENTIFIER]
+            identifier = source.identifier
             collect = self._collector.collect
             collect(identifier)
-            collect(source[_Fields.ID])
+            collect(source.id)
             for case in Switch(identifier):
                 if case(_Identifier.SOURCE_CSV):
-                    collect(source[_Fields.PATH])
-                    collect(source[_Fields.DELIMITER_FIELD])
-                    collect(source[_Fields.DELIMITER_LINE])
-                    collect(source[_Fields.TYPES])
+                    collect(source.path)
+                    collect(source.delimiter_field)
+                    collect(source.delimiter_line)
+                    collect(source.types)
                     break
                 if case(_Identifier.SOURCE_TEXT):
-                    collect(source[_Fields.PATH])
+                    collect(source.path)
                     break
                 if case(_Identifier.SOURCE_VALUE):
-                    collect(len(source[_Fields.VALUES]))
-                    for value in source[_Fields.VALUES]:
+                    collect(len(source.values))
+                    for value in source.values:
                         collect(value)
                     break
 
     def _send_operations(self):
         collect = self._collector.collect
         for set in self._sets:
-            identifier = set.get(_Fields.IDENTIFIER)
-            collect(set[_Fields.IDENTIFIER])
-            collect(set[_Fields.ID])
-            collect(set[_Fields.PARENT][_Fields.ID])
+            identifier = set.identifier
+            collect(set.identifier)
+            collect(set.id)
+            collect(set.parent.id)
             for case in Switch(identifier):
                 if case(_Identifier.SORT):
-                    collect(set[_Fields.FIELD])
-                    collect(set[_Fields.ORDER])
+                    collect(set.field)
+                    collect(set.order)
                     break
                 if case(_Identifier.GROUP):
-                    collect(set[_Fields.KEYS])
+                    collect(set.keys)
                     break
                 if case(_Identifier.COGROUP):
-                    collect(set[_Fields.OTHER][_Fields.ID])
-                    collect(set[_Fields.KEY1])
-                    collect(set[_Fields.KEY2])
-                    collect(set[_Fields.TYPES])
-                    collect(set[_Fields.NAME])
+                    collect(set.other.id)
+                    collect(set.key1)
+                    collect(set.key2)
+                    collect(set.types)
+                    collect(set.name)
                     break
                 if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST):
-                    collect(set[_Fields.OTHER][_Fields.ID])
-                    collect(set[_Fields.TYPES])
-                    collect(len(set[_Fields.PROJECTIONS]))
-                    for p in set[_Fields.PROJECTIONS]:
+                    collect(set.other.id)
+                    collect(set.types)
+                    collect(len(set.projections))
+                    for p in set.projections:
                         collect(p[0])
                         collect(p[1])
-                    collect(set[_Fields.NAME])
+                    collect(set.name)
                     break
                 if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
-                    collect(set[_Fields.TYPES])
-                    collect(set[_Fields.COMBINE])
-                    collect(set[_Fields.NAME])
+                    collect(set.types)
+                    collect(set.combine)
+                    collect(set.name)
                     break
                 if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT):
-                    collect(set[_Fields.KEY1])
-                    collect(set[_Fields.KEY2])
-                    collect(set[_Fields.OTHER][_Fields.ID])
-                    collect(set[_Fields.TYPES])
-                    collect(len(set[_Fields.PROJECTIONS]))
-                    for p in set[_Fields.PROJECTIONS]:
+                    collect(set.key1)
+                    collect(set.key2)
+                    collect(set.other.id)
+                    collect(set.types)
+                    collect(len(set.projections))
+                    for p in set.projections:
                         collect(p[0])
                         collect(p[1])
-                    collect(set[_Fields.NAME])
+                    collect(set.name)
                     break
                 if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER):
-                    collect(set[_Fields.TYPES])
-                    collect(set[_Fields.NAME])
+                    collect(set.types)
+                    collect(set.name)
                     break
                 if case(_Identifier.UNION):
-                    collect(set[_Fields.OTHER][_Fields.ID])
+                    collect(set.other.id)
                     break
                 if case(_Identifier.PROJECTION):
-                    collect(set[_Fields.KEYS])
+                    collect(set.keys)
                     break
                 if case():
                     raise KeyError("Environment._send_child_sets(): Invalid operation identifier: " + str(identifier))
 
     def _send_sinks(self):
         for sink in self._sinks:
-            identifier = sink[_Fields.IDENTIFIER]
+            identifier = sink.identifier
             collect = self._collector.collect
             collect(identifier)
-            collect(sink[_Fields.PARENT][_Fields.ID])
+            collect(sink.parent.id)
             for case in Switch(identifier):
                 if case(_Identifier.SINK_CSV):
-                    collect(sink[_Fields.PATH])
-                    collect(sink[_Fields.DELIMITER_FIELD])
-                    collect(sink[_Fields.DELIMITER_LINE])
-                    collect(sink[_Fields.WRITE_MODE])
+                    collect(sink.path)
+                    collect(sink.delimiter_field)
+                    collect(sink.delimiter_line)
+                    collect(sink.write_mode)
                     break;
                 if case(_Identifier.SINK_TEXT):
-                    collect(sink[_Fields.PATH])
-                    collect(sink[_Fields.WRITE_MODE])
+                    collect(sink.path)
+                    collect(sink.write_mode)
                     break
                 if case(_Identifier.SINK_PRINT):
-                    collect(sink[_Fields.TO_ERR])
+                    collect(sink.to_err)
                     break
 
     def _send_broadcast(self):
         collect = self._collector.collect
         for entry in self._broadcast:
             collect(_Identifier.BROADCAST)
-            collect(entry[_Fields.PARENT][_Fields.ID])
-            collect(entry[_Fields.OTHER][_Fields.ID])
-            collect(entry[_Fields.NAME])
+            collect(entry.parent.id)
+            collect(entry.other.id)
+            collect(entry.name)

http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
new file mode 100644
index 0000000..faa2215
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -0,0 +1,50 @@
+# ###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Constants import WriteMode
+
+
+class OperationInfo():
+    def __init__(self, info=None):
+        if info is None:
+            self.parent = None
+            self.other = None
+            self.identifier = None
+            self.field = None
+            self.order = None
+            self.keys = None
+            self.key1 = None
+            self.key2 = None
+            self.types = None
+            self.operator = None
+            self.name = None
+            self.combine = False
+            self.delimiter_line = "\n"
+            self.delimiter_field = ","
+            self.write_mode = WriteMode.NO_OVERWRITE
+            self.sinks = []
+            self.children = []
+            self.combineop = None
+            self.path = None
+            self.values = []
+            self.projections = []
+            self.bcvars = []
+            self.id = None
+            self.to_err = False
+        else:
+            self.__dict__.update(info.__dict__)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
index a3b8d07..b2063eb 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 ################################################################################
 from flink.plan.Environment import get_environment
-from flink.plan.Constants import _Fields
 from flink.plan.Constants import INT, STRING, BOOL, FLOAT
 import sys
 
@@ -29,28 +28,28 @@ if __name__ == "__main__":
 
     direct_from_source = d1.filter(lambda x:True)
 
-    if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True):
+    if direct_from_source._info.types != ("hello", 4, 3.2, True):
         sys.exit("Error deducting type directly from source.")
 
     from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True)
 
-    if from_common_udf._info[_Fields.TYPES] != BOOL:
+    if from_common_udf._info.types != BOOL:
         sys.exit("Error deducting type from common udf.")
 
     through_projection = d1.project(3, 2).filter(lambda x:True)
 
-    if through_projection._info[_Fields.TYPES] != (True, 3.2):
+    if through_projection._info.types != (True, 3.2):
         sys.exit("Error deducting type through projection.")
 
     through_default_op = d1.cross(d2).filter(lambda x:True)
 
-    if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), "world"):
-        sys.exit("Error deducting type through default J/C." +str(through_default_op._info[_Fields.TYPES]))
+    if through_default_op._info.types != (("hello", 4, 3.2, True), "world"):
+        sys.exit("Error deducting type through default J/C." +str(through_default_op._info.types))
 
     through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True)
 
-    if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2):
-        sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info[_Fields.TYPES]))
+    if through_prj_op._info.types != (4, "hello", "world", True, 3.2):
+        sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info.types))
 
 
     env = get_environment()