You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/18 12:27:26 UTC
flink git commit: [FLINK-2441] Introduce Python OperationInfo
Repository: flink
Updated Branches:
refs/heads/master 99d52cc69 -> 5d37fe146
[FLINK-2441] Introduce Python OperationInfo
This closes #1352.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d37fe14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d37fe14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d37fe14
Branch: refs/heads/master
Commit: 5d37fe146d57f6d661f7fc9b86c66a6e8a63ab0b
Parents: 99d52cc
Author: zentol <ch...@apache.org>
Authored: Fri Jul 24 20:18:47 2015 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Nov 18 12:26:52 2015 +0100
----------------------------------------------------------------------
.../flink/python/api/flink/plan/Constants.py | 28 --
.../flink/python/api/flink/plan/DataSet.py | 406 +++++++++----------
.../flink/python/api/flink/plan/Environment.py | 209 +++++-----
.../python/api/flink/plan/OperationInfo.py | 50 +++
.../flink/python/api/test_type_deduction.py | 15 +-
5 files changed, 357 insertions(+), 351 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
index b0d79e8..0c9fe80 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -47,34 +47,6 @@ class _Identifier(object):
BROADCAST = "broadcast"
-class _Fields(object):
- PARENT = "parent"
- OTHER = "other_set"
- SINKS = "sinks"
- IDENTIFIER = "identifier"
- FIELD = "field"
- ORDER = "order"
- KEYS = "keys"
- KEY1 = "key1"
- KEY2 = "key2"
- TYPES = "types"
- OPERATOR = "operator"
- META = "meta"
- NAME = "name"
- COMBINE = "combine"
- DELIMITER_LINE = "del_l"
- DELIMITER_FIELD = "del_f"
- WRITE_MODE = "write"
- PATH = "path"
- VALUES = "values"
- COMBINEOP = "combineop"
- CHILDREN = "children"
- BCVARS = "bcvars"
- PROJECTIONS = "projections"
- ID = "id"
- TO_ERR = "to_error"
-
-
class WriteMode(object):
NO_OVERWRITE = 0
OVERWRITE = 1
http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 390a08d..7ef5488 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -19,7 +19,8 @@ import inspect
import copy
import types as TYPES
-from flink.plan.Constants import _Fields, _Identifier, WriteMode, STRING
+from flink.plan.Constants import _Identifier, WriteMode, STRING
+from flink.plan.OperationInfo import OperationInfo
from flink.functions.CoGroupFunction import CoGroupFunction
from flink.functions.FilterFunction import FilterFunction
from flink.functions.FlatMapFunction import FlatMapFunction
@@ -36,29 +37,29 @@ def deduct_output_type(dataset):
default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN])
while True:
- dataset_type = dataset[_Fields.IDENTIFIER]
+ dataset_type = dataset.identifier
if dataset_type in skip:
- dataset = dataset[_Fields.PARENT]
+ dataset = dataset.parent
continue
if dataset_type in source:
if dataset_type == _Identifier.SOURCE_TEXT:
return STRING
if dataset_type == _Identifier.SOURCE_VALUE:
- return dataset[_Fields.VALUES][0]
+ return dataset.values[0]
if dataset_type == _Identifier.SOURCE_CSV:
- return dataset[_Fields.TYPES]
+ return dataset.types
if dataset_type == _Identifier.PROJECTION:
- return tuple([deduct_output_type(dataset[_Fields.PARENT])[k] for k in dataset[_Fields.KEYS]])
+ return tuple([deduct_output_type(dataset.parent)[k] for k in dataset.keys])
if dataset_type in default:
- if dataset[_Fields.OPERATOR] is not None: #udf-join/cross
- return dataset[_Fields.TYPES]
- if len(dataset[_Fields.PROJECTIONS]) == 0: #defaultjoin/-cross
- return (deduct_output_type(dataset[_Fields.PARENT]), deduct_output_type(dataset[_Fields.OTHER]))
+ if dataset.operator is not None: #udf-join/cross
+ return dataset.types
+ if len(dataset.projections) == 0: #defaultjoin/-cross
+ return (deduct_output_type(dataset.parent), deduct_output_type(dataset.other))
else: #projectjoin/-cross
- t1 = deduct_output_type(dataset[_Fields.PARENT])
- t2 = deduct_output_type(dataset[_Fields.OTHER])
+ t1 = deduct_output_type(dataset.parent)
+ t2 = deduct_output_type(dataset.other)
out_type = []
- for prj in dataset[_Fields.PROJECTIONS]:
+ for prj in dataset.projections:
if len(prj[1]) == 0: #projection on non-tuple dataset
if prj[0] == "first":
out_type.append(t1)
@@ -71,30 +72,25 @@ def deduct_output_type(dataset):
else:
out_type.append(t2[key])
return tuple(out_type)
- return dataset[_Fields.TYPES]
+ return dataset.types
class Set(object):
- def __init__(self, env, info, copy_set=False):
+ def __init__(self, env, info):
self._env = env
self._info = info
- if not copy_set:
- self._info[_Fields.ID] = env._counter
- self._info[_Fields.BCVARS] = []
- self._info[_Fields.CHILDREN] = []
- self._info[_Fields.SINKS] = []
- self._info[_Fields.NAME] = None
- env._counter += 1
+ info.id = env._counter
+ env._counter += 1
def output(self, to_error=False):
"""
Writes a DataSet to the standard output stream (stdout).
"""
- child = dict()
- child[_Fields.IDENTIFIER] = _Identifier.SINK_PRINT
- child[_Fields.PARENT] = self._info
- child[_Fields.TO_ERR] = to_error
- self._info[_Fields.SINKS].append(child)
+ child = OperationInfo()
+ child.identifier = _Identifier.SINK_PRINT
+ child.parent = self._info
+ child.to_err = to_error
+ self._info.sinks.append(child)
self._env._sinks.append(child)
def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
@@ -104,12 +100,12 @@ class Set(object):
:param path: he path pointing to the location the text file is written to.
:param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
"""
- child = dict()
- child[_Fields.IDENTIFIER] = _Identifier.SINK_TEXT
- child[_Fields.PARENT] = self._info
- child[_Fields.PATH] = path
- child[_Fields.WRITE_MODE] = write_mode
- self._info[_Fields.SINKS].append(child)
+ child = OperationInfo()
+ child.identifier = _Identifier.SINK_TEXT
+ child.parent = self._info
+ child.path = path
+ child.write_mode = write_mode
+ self._info.sinks.append(child)
self._env._sinks.append(child)
def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
@@ -120,14 +116,14 @@ class Set(object):
:param path: The path pointing to the location the CSV file is written to.
:param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten
"""
- child = dict()
- child[_Fields.IDENTIFIER] = _Identifier.SINK_CSV
- child[_Fields.PATH] = path
- child[_Fields.PARENT] = self._info
- child[_Fields.DELIMITER_FIELD] = field_delimiter
- child[_Fields.DELIMITER_LINE] = line_delimiter
- child[_Fields.WRITE_MODE] = write_mode
- self._info[_Fields.SINKS].append(child)
+ child = OperationInfo()
+ child.identifier = _Identifier.SINK_CSV
+ child.path = path
+ child.parent = self._info
+ child.delimiter_field = field_delimiter
+ child.delimiter_line = line_delimiter
+ child.write_mode = write_mode
+ self._info.sinks.append(child)
self._env._sinks.append(child)
def reduce_group(self, operator, types, combinable=False):
@@ -147,28 +143,26 @@ class Set(object):
f = operator
operator = GroupReduceFunction()
operator.reduce = f
- child = dict()
+ child = OperationInfo()
child_set = OperatorSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
- child[_Fields.PARENT] = self._info
- child[_Fields.OPERATOR] = copy.deepcopy(operator)
- child[_Fields.OPERATOR]._combine = False
- child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- child[_Fields.TYPES] = types
- child[_Fields.COMBINE] = combinable
- child[_Fields.COMBINEOP] = operator
- child[_Fields.COMBINEOP]._combine = True
- child[_Fields.NAME] = "PythonGroupReduce"
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.GROUPREDUCE
+ child.parent = self._info
+ child.operator = copy.deepcopy(operator)
+ child.operator._combine = False
+ child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child.types = types
+ child.combine = combinable
+ child.combineop = operator
+ child.combineop._combine = True
+ child.name = "PythonGroupReduce"
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
class ReduceSet(Set):
- def __init__(self, env, info, copy_set=False):
- super(ReduceSet, self).__init__(env, info, copy_set)
- if not copy_set:
- self._is_chained = False
+ def __init__(self, env, info):
+ super(ReduceSet, self).__init__(env, info)
def reduce(self, operator):
"""
@@ -184,24 +178,24 @@ class ReduceSet(Set):
f = operator
operator = ReduceFunction()
operator.reduce = f
- child = dict()
+ child = OperationInfo()
child_set = OperatorSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.REDUCE
- child[_Fields.PARENT] = self._info
- child[_Fields.OPERATOR] = operator
- child[_Fields.COMBINEOP] = operator
- child[_Fields.COMBINE] = False
- child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- child[_Fields.NAME] = "PythonReduce"
- child[_Fields.TYPES] = deduct_output_type(self._info)
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.REDUCE
+ child.parent = self._info
+ child.operator = operator
+ child.combineop = operator
+ child.combine = False
+ child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child.name = "PythonReduce"
+ child.types = deduct_output_type(self._info)
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
class DataSet(ReduceSet):
- def __init__(self, env, info, copy_set=False):
- super(DataSet, self).__init__(env, info, copy_set)
+ def __init__(self, env, info):
+ super(DataSet, self).__init__(env, info)
def project(self, *fields):
"""
@@ -215,12 +209,12 @@ class DataSet(ReduceSet):
:return: The projected DataSet.
"""
- child = dict()
+ child = OperationInfo()
child_set = DataSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.PROJECTION
- child[_Fields.PARENT] = self._info
- child[_Fields.KEYS] = fields
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.PROJECTION
+ child.parent = self._info
+ child.keys = fields
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -237,14 +231,14 @@ class DataSet(ReduceSet):
:param keys: One or more field positions on which the DataSet will be grouped.
:return:A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
"""
- child = dict()
+ child = OperationInfo()
child_chain = []
child_set = UnsortedGrouping(self._env, child, child_chain)
- child[_Fields.IDENTIFIER] = _Identifier.GROUP
- child[_Fields.PARENT] = self._info
- child[_Fields.KEYS] = keys
+ child.identifier = _Identifier.GROUP
+ child.parent = self._info
+ child.keys = keys
child_chain.append(child)
- self._info[_Fields.CHILDREN].append(child)
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -261,13 +255,13 @@ class DataSet(ReduceSet):
:param other_set: The other DataSet of the CoGroup transformation.
:return:A CoGroupOperator to continue the definition of the CoGroup transformation.
"""
- child = dict()
- other_set._info[_Fields.CHILDREN].append(child)
+ child = OperationInfo()
+ other_set._info.children.append(child)
child_set = CoGroupOperatorWhere(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.COGROUP
- child[_Fields.PARENT] = self._info
- child[_Fields.OTHER] = other_set._info
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.COGROUP
+ child.parent = self._info
+ child.other = other_set._info
+ self._info.children.append(child)
return child_set
def cross(self, other_set):
@@ -308,16 +302,13 @@ class DataSet(ReduceSet):
return self._cross(other_set, _Identifier.CROSST)
def _cross(self, other_set, identifier):
- child = dict()
+ child = OperationInfo()
child_set = CrossOperator(self._env, child)
- child[_Fields.IDENTIFIER] = identifier
- child[_Fields.PARENT] = self._info
- child[_Fields.OTHER] = other_set._info
- child[_Fields.PROJECTIONS] = []
- child[_Fields.OPERATOR] = None
- child[_Fields.META] = None
- self._info[_Fields.CHILDREN].append(child)
- other_set._info[_Fields.CHILDREN].append(child)
+ child.identifier = identifier
+ child.parent = self._info
+ child.other = other_set._info
+ self._info.children.append(child)
+ other_set._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -335,15 +326,15 @@ class DataSet(ReduceSet):
f = operator
operator = FilterFunction()
operator.filter = f
- child = dict()
+ child = OperationInfo()
child_set = OperatorSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.FILTER
- child[_Fields.PARENT] = self._info
- child[_Fields.OPERATOR] = operator
- child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- child[_Fields.NAME] = "PythonFilter"
- child[_Fields.TYPES] = deduct_output_type(self._info)
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.FILTER
+ child.parent = self._info
+ child.operator = operator
+ child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child.name = "PythonFilter"
+ child.types = deduct_output_type(self._info)
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -362,15 +353,15 @@ class DataSet(ReduceSet):
f = operator
operator = FlatMapFunction()
operator.flat_map = f
- child = dict()
+ child = OperationInfo()
child_set = OperatorSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.FLATMAP
- child[_Fields.PARENT] = self._info
- child[_Fields.OPERATOR] = operator
- child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- child[_Fields.TYPES] = types
- child[_Fields.NAME] = "PythonFlatMap"
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.FLATMAP
+ child.parent = self._info
+ child.operator = operator
+ child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child.types = types
+ child.name = "PythonFlatMap"
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -412,16 +403,13 @@ class DataSet(ReduceSet):
return self._join(other_set, _Identifier.JOINT)
def _join(self, other_set, identifier):
- child = dict()
+ child = OperationInfo()
child_set = JoinOperatorWhere(self._env, child)
- child[_Fields.IDENTIFIER] = identifier
- child[_Fields.PARENT] = self._info
- child[_Fields.OTHER] = other_set._info
- child[_Fields.OPERATOR] = None
- child[_Fields.META] = None
- child[_Fields.PROJECTIONS] = []
- self._info[_Fields.CHILDREN].append(child)
- other_set._info[_Fields.CHILDREN].append(child)
+ child.identifier = identifier
+ child.parent = self._info
+ child.other = other_set._info
+ self._info.children.append(child)
+ other_set._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -440,15 +428,15 @@ class DataSet(ReduceSet):
f = operator
operator = MapFunction()
operator.map = f
- child = dict()
+ child = OperationInfo()
child_set = OperatorSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.MAP
- child[_Fields.PARENT] = self._info
- child[_Fields.OPERATOR] = operator
- child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- child[_Fields.TYPES] = types
- child[_Fields.NAME] = "PythonMap"
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.MAP
+ child.parent = self._info
+ child.operator = operator
+ child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child.types = types
+ child.name = "PythonMap"
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -471,15 +459,15 @@ class DataSet(ReduceSet):
f = operator
operator = MapPartitionFunction()
operator.map_partition = f
- child = dict()
+ child = OperationInfo()
child_set = OperatorSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.MAPPARTITION
- child[_Fields.PARENT] = self._info
- child[_Fields.OPERATOR] = operator
- child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- child[_Fields.TYPES] = types
- child[_Fields.NAME] = "PythonMapPartition"
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.MAPPARTITION
+ child.parent = self._info
+ child.operator = operator
+ child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child.types = types
+ child.name = "PythonMapPartition"
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -492,28 +480,28 @@ class DataSet(ReduceSet):
:param other_set: The other DataSet which is unioned with the current DataSet.
:return:The resulting DataSet.
"""
- child = dict()
+ child = OperationInfo()
child_set = DataSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.UNION
- child[_Fields.PARENT] = self._info
- child[_Fields.OTHER] = other_set._info
- self._info[_Fields.CHILDREN].append(child)
- other_set._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.UNION
+ child.parent = self._info
+ child.other = other_set._info
+ self._info.children.append(child)
+ other_set._info.children.append(child)
self._env._sets.append(child)
return child_set
class OperatorSet(DataSet):
- def __init__(self, env, info, copy_set=False):
- super(OperatorSet, self).__init__(env, info, copy_set)
+ def __init__(self, env, info):
+ super(OperatorSet, self).__init__(env, info)
def with_broadcast_set(self, name, set):
- child = dict()
- child[_Fields.PARENT] = self._info
- child[_Fields.OTHER] = set._info
- child[_Fields.NAME] = name
- self._info[_Fields.BCVARS].append(child)
- set._info[_Fields.CHILDREN].append(child)
+ child = OperationInfo()
+ child.parent = self._info
+ child.other = set._info
+ child.name = name
+ self._info.bcvars.append(child)
+ set._info.children.append(child)
self._env._broadcast.append(child)
return self
@@ -523,9 +511,7 @@ class Grouping(object):
self._env = env
self._child_chain = child_chain
self._info = info
- info[_Fields.ID] = env._counter
- info[_Fields.CHILDREN] = []
- info[_Fields.SINKS] = []
+ info.id = env._counter
env._counter += 1
def reduce_group(self, operator, types, combinable=False):
@@ -545,21 +531,21 @@ class Grouping(object):
f = operator
operator = GroupReduceFunction()
operator.reduce = f
- operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
- operator._set_sort_ops([(x[_Fields.FIELD], x[_Fields.ORDER]) for x in self._child_chain[1:]])
- child = dict()
+ operator._set_grouping_keys(self._child_chain[0].keys)
+ operator._set_sort_ops([(x.field, x.order) for x in self._child_chain[1:]])
+ child = OperationInfo()
child_set = OperatorSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
- child[_Fields.PARENT] = self._info
- child[_Fields.OPERATOR] = copy.deepcopy(operator)
- child[_Fields.OPERATOR]._combine = False
- child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- child[_Fields.TYPES] = types
- child[_Fields.COMBINE] = combinable
- child[_Fields.COMBINEOP] = operator
- child[_Fields.COMBINEOP]._combine = True
- child[_Fields.NAME] = "PythonGroupReduce"
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.GROUPREDUCE
+ child.parent = self._info
+ child.operator = copy.deepcopy(operator)
+ child.operator._combine = False
+ child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child.types = types
+ child.combine = combinable
+ child.combineop = operator
+ child.combineop._combine = True
+ child.name = "PythonGroupReduce"
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -575,13 +561,13 @@ class Grouping(object):
:param order: The Order in which the specified Tuple field is sorted. See DataSet.Order.
:return:A SortedGrouping with specified order of group element.
"""
- child = dict()
+ child = OperationInfo()
child_set = SortedGrouping(self._env, child, self._child_chain)
- child[_Fields.IDENTIFIER] = _Identifier.SORT
- child[_Fields.PARENT] = self._info
- child[_Fields.FIELD] = field
- child[_Fields.ORDER] = order
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.SORT
+ child.parent = self._info
+ child.field = field
+ child.order = order
+ self._info.children.append(child)
self._child_chain.append(child)
self._env._sets.append(child)
return child_set
@@ -601,22 +587,22 @@ class UnsortedGrouping(Grouping):
:param operator:The ReduceFunction that is applied on the DataSet.
:return:A ReduceOperator that represents the reduced DataSet.
"""
- operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
+ operator._set_grouping_keys(self._child_chain[0].keys)
for i in self._child_chain:
self._env._sets.append(i)
- child = dict()
+ child = OperationInfo()
child_set = OperatorSet(self._env, child)
- child[_Fields.IDENTIFIER] = _Identifier.REDUCE
- child[_Fields.PARENT] = self._info
- child[_Fields.OPERATOR] = copy.deepcopy(operator)
- child[_Fields.OPERATOR]._combine = False
- child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- child[_Fields.COMBINE] = True
- child[_Fields.COMBINEOP] = operator
- child[_Fields.COMBINEOP]._combine = True
- child[_Fields.NAME] = "PythonReduce"
- child[_Fields.TYPES] = deduct_output_type(self._info)
- self._info[_Fields.CHILDREN].append(child)
+ child.identifier = _Identifier.REDUCE
+ child.parent = self._info
+ child.operator = copy.deepcopy(operator)
+ child.operator._combine = False
+ child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ child.combine = True
+ child.combineop = operator
+ child.combineop._combine = True
+ child.name = "PythonReduce"
+ child.types = deduct_output_type(self._info)
+ self._info.children.append(child)
self._env._sets.append(child)
return child_set
@@ -642,7 +628,7 @@ class CoGroupOperatorWhere(object):
:param fields: The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys.
:return: An incomplete CoGroup transformation.
"""
- self._info[_Fields.KEY1] = fields
+ self._info.key1 = fields
return CoGroupOperatorTo(self._env, self._info)
@@ -661,7 +647,7 @@ class CoGroupOperatorTo(object):
:param fields: The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys.
:return: An incomplete CoGroup transformation.
"""
- self._info[_Fields.KEY2] = fields
+ self._info.key2 = fields
return CoGroupOperatorUsing(self._env, self._info)
@@ -686,12 +672,12 @@ class CoGroupOperatorUsing(object):
operator = CoGroupFunction()
operator.co_group = f
new_set = OperatorSet(self._env, self._info)
- operator._keys1 = self._info[_Fields.KEY1]
- operator._keys2 = self._info[_Fields.KEY2]
- self._info[_Fields.OPERATOR] = operator
- self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- self._info[_Fields.TYPES] = types
- self._info[_Fields.NAME] = "PythonCoGroup"
+ operator._keys1 = self._info.key1
+ operator._keys2 = self._info.key2
+ self._info.operator = operator
+ self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ self._info.types = types
+ self._info.name = "PythonCoGroup"
self._env._sets.append(self._info)
return new_set
@@ -712,7 +698,7 @@ class JoinOperatorWhere(object):
:return:An incomplete Join transformation.
"""
- self._info[_Fields.KEY1] = fields
+ self._info.key1 = fields
return JoinOperatorTo(self._env, self._info)
@@ -731,7 +717,7 @@ class JoinOperatorTo(object):
:param fields:The indexes of the Tuple fields of the second join DataSet that should be used as keys.
:return:An incomplete Join Transformation.
"""
- self._info[_Fields.KEY2] = fields
+ self._info.key2 = fields
return JoinOperator(self._env, self._info)
@@ -750,7 +736,7 @@ class JoinOperatorProjection(DataSet):
:param fields: The indexes of the selected fields.
:return: An incomplete JoinProjection.
"""
- self._info[_Fields.PROJECTIONS].append(("first", fields))
+ self._info.projections.append(("first", fields))
return self
def project_second(self, *fields):
@@ -764,14 +750,13 @@ class JoinOperatorProjection(DataSet):
:param fields: The indexes of the selected fields.
:return: An incomplete JoinProjection.
"""
- self._info[_Fields.PROJECTIONS].append(("second", fields))
+ self._info.projections.append(("second", fields))
return self
class JoinOperator(DataSet):
def __init__(self, env, info):
super(JoinOperator, self).__init__(env, info)
- self._info[_Fields.TYPES] = None
def project_first(self, *fields):
"""
@@ -813,12 +798,12 @@ class JoinOperator(DataSet):
f = operator
operator = JoinFunction()
operator.join = f
- self._info[_Fields.OPERATOR] = operator
- self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- self._info[_Fields.TYPES] = types
- self._info[_Fields.NAME] = "PythonJoin"
+ self._info.operator = operator
+ self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ self._info.types = types
+ self._info.name = "PythonJoin"
self._env._sets.append(self._info)
- return OperatorSet(self._env, self._info, copy_set=True)
+ return OperatorSet(self._env, self._info)
class CrossOperatorProjection(DataSet):
@@ -836,7 +821,7 @@ class CrossOperatorProjection(DataSet):
:param fields: The indexes of the selected fields.
:return: An incomplete CrossProjection.
"""
- self._info[_Fields.PROJECTIONS].append(("first", fields))
+ self._info.projections.append(("first", fields))
return self
def project_second(self, *fields):
@@ -850,14 +835,13 @@ class CrossOperatorProjection(DataSet):
:param fields: The indexes of the selected fields.
:return: An incomplete CrossProjection.
"""
- self._info[_Fields.PROJECTIONS].append(("second", fields))
+ self._info.projections.append(("second", fields))
return self
class CrossOperator(DataSet):
def __init__(self, env, info):
super(CrossOperator, self).__init__(env, info)
- info[_Fields.TYPES] = None
def project_first(self, *fields):
"""
@@ -899,8 +883,8 @@ class CrossOperator(DataSet):
f = operator
operator = CrossFunction()
operator.cross = f
- self._info[_Fields.OPERATOR] = operator
- self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
- self._info[_Fields.TYPES] = types
- self._info[_Fields.NAME] = "PythonCross"
- return OperatorSet(self._env, self._info, copy_set=True)
+ self._info.operator = operator
+ self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+ self._info.types = types
+ self._info.name = "PythonCross"
+ return OperatorSet(self._env, self._info)
http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 8647686..bea6212 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -18,7 +18,8 @@
from flink.connection import Connection
from flink.connection import Collector
from flink.plan.DataSet import DataSet
-from flink.plan.Constants import _Fields, _Identifier
+from flink.plan.Constants import _Identifier
+from flink.plan.OperationInfo import OperationInfo
from flink.utilities import Switch
import copy
import sys
@@ -70,13 +71,13 @@ class Environment(object):
:param types: Specifies the types for the CSV fields.
:return:A CsvReader that can be used to configure the CSV input.
"""
- child = dict()
+ child = OperationInfo()
child_set = DataSet(self, child)
- child[_Fields.IDENTIFIER] = _Identifier.SOURCE_CSV
- child[_Fields.DELIMITER_LINE] = line_delimiter
- child[_Fields.DELIMITER_FIELD] = field_delimiter
- child[_Fields.PATH] = path
- child[_Fields.TYPES] = types
+ child.identifier = _Identifier.SOURCE_CSV
+ child.delimiter_line = line_delimiter
+ child.delimiter_field = field_delimiter
+ child.path = path
+ child.types = types
self._sources.append(child)
return child_set
@@ -89,10 +90,10 @@ class Environment(object):
:param path: The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
:return: A DataSet that represents the data read from the given file as text lines.
"""
- child = dict()
+ child = OperationInfo()
child_set = DataSet(self, child)
- child[_Fields.IDENTIFIER] = _Identifier.SOURCE_TEXT
- child[_Fields.PATH] = path
+ child.identifier = _Identifier.SOURCE_TEXT
+ child.path = path
self._sources.append(child)
return child_set
@@ -106,10 +107,10 @@ class Environment(object):
:param elements: The elements to make up the data set.
:return: A DataSet representing the given list of elements.
"""
- child = dict()
+ child = OperationInfo()
child_set = DataSet(self, child)
- child[_Fields.IDENTIFIER] = _Identifier.SOURCE_VALUE
- child[_Fields.VALUES] = elements
+ child.identifier = _Identifier.SOURCE_VALUE
+ child.values = elements
self._sources.append(child)
return child_set
@@ -155,10 +156,10 @@ class Environment(object):
operator = None
for set in self._sets:
- if set[_Fields.ID] == id:
- operator = set[_Fields.OPERATOR]
- if set[_Fields.ID] == -id:
- operator = set[_Fields.COMBINEOP]
+ if set.id == id:
+ operator = set.operator
+ if set.id == -id:
+ operator = set.combineop
operator._configure(input_path, output_path, port, self)
operator._go()
sys.stdout.flush()
@@ -183,55 +184,55 @@ class Environment(object):
x = len(self._sets) - 1
while x > -1:
child = self._sets[x]
- child_type = child[_Fields.IDENTIFIER]
+ child_type = child.identifier
if child_type in chainable:
- parent = child[_Fields.PARENT]
- parent_type = parent[_Fields.IDENTIFIER]
- if len(parent[_Fields.SINKS]) == 0:
+ parent = child.parent
+ parent_type = parent.identifier
+ if len(parent.sinks) == 0:
if child_type == _Identifier.GROUPREDUCE or child_type == _Identifier.REDUCE:
- if child[_Fields.COMBINE]:
+ if child.combine:
while parent_type == _Identifier.GROUP or parent_type == _Identifier.SORT:
- parent = parent[_Fields.PARENT]
- parent_type = parent[_Fields.IDENTIFIER]
- if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
- if parent[_Fields.OPERATOR] is not None:
- function = child[_Fields.COMBINEOP]
- parent[_Fields.OPERATOR]._chain(function)
- child[_Fields.COMBINE] = False
- parent[_Fields.NAME] += " -> PythonCombine"
- for bcvar in child[_Fields.BCVARS]:
+ parent = parent.parent
+ parent_type = parent.identifier
+ if parent_type in udf and len(parent.children) == 1:
+ if parent.operator is not None:
+ function = child.combineop
+ parent.operator._chain(function)
+ child.combine = False
+ parent.name += " -> PythonCombine"
+ for bcvar in child.bcvars:
bcvar_copy = copy.deepcopy(bcvar)
- bcvar_copy[_Fields.PARENT] = parent
+ bcvar_copy.parent = parent
self._broadcast.append(bcvar_copy)
else:
- if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
- parent_op = parent[_Fields.OPERATOR]
+ if parent_type in udf and len(parent.children) == 1:
+ parent_op = parent.operator
if parent_op is not None:
- function = child[_Fields.OPERATOR]
+ function = child.operator
parent_op._chain(function)
- parent[_Fields.NAME] += " -> " + child[_Fields.NAME]
- parent[_Fields.TYPES] = child[_Fields.TYPES]
- for grand_child in child[_Fields.CHILDREN]:
- if grand_child[_Fields.IDENTIFIER] in multi_input:
- if grand_child[_Fields.PARENT][_Fields.ID] == child[_Fields.ID]:
- grand_child[_Fields.PARENT] = parent
+ parent.name += " -> " + child.name
+ parent.types = child.types
+ for grand_child in child.children:
+ if grand_child.identifier in multi_input:
+ if grand_child.parent.id == child.id:
+ grand_child.parent = parent
else:
- grand_child[_Fields.OTHER] = parent
+ grand_child.other = parent
else:
- grand_child[_Fields.PARENT] = parent
- parent[_Fields.CHILDREN].append(grand_child)
- parent[_Fields.CHILDREN].remove(child)
- for sink in child[_Fields.SINKS]:
- sink[_Fields.PARENT] = parent
- parent[_Fields.SINKS].append(sink)
- for bcvar in child[_Fields.BCVARS]:
- bcvar[_Fields.PARENT] = parent
- parent[_Fields.BCVARS].append(bcvar)
+ grand_child.parent = parent
+ parent.children.append(grand_child)
+ parent.children.remove(child)
+ for sink in child.sinks:
+ sink.parent = parent
+ parent.sinks.append(sink)
+ for bcvar in child.bcvars:
+ bcvar.parent = parent
+ parent.bcvars.append(bcvar)
self._remove_set((child))
x -= 1
def _remove_set(self, set):
- self._sets[:] = [s for s in self._sets if s[_Fields.ID]!=set[_Fields.ID]]
+ self._sets[:] = [s for s in self._sets if s.id!=set.id]
def _send_plan(self):
self._send_parameters()
@@ -248,111 +249,111 @@ class Environment(object):
def _send_sources(self):
for source in self._sources:
- identifier = source[_Fields.IDENTIFIER]
+ identifier = source.identifier
collect = self._collector.collect
collect(identifier)
- collect(source[_Fields.ID])
+ collect(source.id)
for case in Switch(identifier):
if case(_Identifier.SOURCE_CSV):
- collect(source[_Fields.PATH])
- collect(source[_Fields.DELIMITER_FIELD])
- collect(source[_Fields.DELIMITER_LINE])
- collect(source[_Fields.TYPES])
+ collect(source.path)
+ collect(source.delimiter_field)
+ collect(source.delimiter_line)
+ collect(source.types)
break
if case(_Identifier.SOURCE_TEXT):
- collect(source[_Fields.PATH])
+ collect(source.path)
break
if case(_Identifier.SOURCE_VALUE):
- collect(len(source[_Fields.VALUES]))
- for value in source[_Fields.VALUES]:
+ collect(len(source.values))
+ for value in source.values:
collect(value)
break
def _send_operations(self):
collect = self._collector.collect
for set in self._sets:
- identifier = set.get(_Fields.IDENTIFIER)
- collect(set[_Fields.IDENTIFIER])
- collect(set[_Fields.ID])
- collect(set[_Fields.PARENT][_Fields.ID])
+ identifier = set.identifier
+ collect(set.identifier)
+ collect(set.id)
+ collect(set.parent.id)
for case in Switch(identifier):
if case(_Identifier.SORT):
- collect(set[_Fields.FIELD])
- collect(set[_Fields.ORDER])
+ collect(set.field)
+ collect(set.order)
break
if case(_Identifier.GROUP):
- collect(set[_Fields.KEYS])
+ collect(set.keys)
break
if case(_Identifier.COGROUP):
- collect(set[_Fields.OTHER][_Fields.ID])
- collect(set[_Fields.KEY1])
- collect(set[_Fields.KEY2])
- collect(set[_Fields.TYPES])
- collect(set[_Fields.NAME])
+ collect(set.other.id)
+ collect(set.key1)
+ collect(set.key2)
+ collect(set.types)
+ collect(set.name)
break
if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST):
- collect(set[_Fields.OTHER][_Fields.ID])
- collect(set[_Fields.TYPES])
- collect(len(set[_Fields.PROJECTIONS]))
- for p in set[_Fields.PROJECTIONS]:
+ collect(set.other.id)
+ collect(set.types)
+ collect(len(set.projections))
+ for p in set.projections:
collect(p[0])
collect(p[1])
- collect(set[_Fields.NAME])
+ collect(set.name)
break
if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
- collect(set[_Fields.TYPES])
- collect(set[_Fields.COMBINE])
- collect(set[_Fields.NAME])
+ collect(set.types)
+ collect(set.combine)
+ collect(set.name)
break
if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT):
- collect(set[_Fields.KEY1])
- collect(set[_Fields.KEY2])
- collect(set[_Fields.OTHER][_Fields.ID])
- collect(set[_Fields.TYPES])
- collect(len(set[_Fields.PROJECTIONS]))
- for p in set[_Fields.PROJECTIONS]:
+ collect(set.key1)
+ collect(set.key2)
+ collect(set.other.id)
+ collect(set.types)
+ collect(len(set.projections))
+ for p in set.projections:
collect(p[0])
collect(p[1])
- collect(set[_Fields.NAME])
+ collect(set.name)
break
if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER):
- collect(set[_Fields.TYPES])
- collect(set[_Fields.NAME])
+ collect(set.types)
+ collect(set.name)
break
if case(_Identifier.UNION):
- collect(set[_Fields.OTHER][_Fields.ID])
+ collect(set.other.id)
break
if case(_Identifier.PROJECTION):
- collect(set[_Fields.KEYS])
+ collect(set.keys)
break
if case():
raise KeyError("Environment._send_child_sets(): Invalid operation identifier: " + str(identifier))
def _send_sinks(self):
for sink in self._sinks:
- identifier = sink[_Fields.IDENTIFIER]
+ identifier = sink.identifier
collect = self._collector.collect
collect(identifier)
- collect(sink[_Fields.PARENT][_Fields.ID])
+ collect(sink.parent.id)
for case in Switch(identifier):
if case(_Identifier.SINK_CSV):
- collect(sink[_Fields.PATH])
- collect(sink[_Fields.DELIMITER_FIELD])
- collect(sink[_Fields.DELIMITER_LINE])
- collect(sink[_Fields.WRITE_MODE])
+ collect(sink.path)
+ collect(sink.delimiter_field)
+ collect(sink.delimiter_line)
+ collect(sink.write_mode)
break;
if case(_Identifier.SINK_TEXT):
- collect(sink[_Fields.PATH])
- collect(sink[_Fields.WRITE_MODE])
+ collect(sink.path)
+ collect(sink.write_mode)
break
if case(_Identifier.SINK_PRINT):
- collect(sink[_Fields.TO_ERR])
+ collect(sink.to_err)
break
def _send_broadcast(self):
collect = self._collector.collect
for entry in self._broadcast:
collect(_Identifier.BROADCAST)
- collect(entry[_Fields.PARENT][_Fields.ID])
- collect(entry[_Fields.OTHER][_Fields.ID])
- collect(entry[_Fields.NAME])
+ collect(entry.parent.id)
+ collect(entry.other.id)
+ collect(entry.name)
http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
new file mode 100644
index 0000000..faa2215
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -0,0 +1,50 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Constants import WriteMode
+
+
+class OperationInfo():
+ def __init__(self, info=None):
+ if info is None:
+ self.parent = None
+ self.other = None
+ self.identifier = None
+ self.field = None
+ self.order = None
+ self.keys = None
+ self.key1 = None
+ self.key2 = None
+ self.types = None
+ self.operator = None
+ self.name = None
+ self.combine = False
+ self.delimiter_line = "\n"
+ self.delimiter_field = ","
+ self.write_mode = WriteMode.NO_OVERWRITE
+ self.sinks = []
+ self.children = []
+ self.combineop = None
+ self.path = None
+ self.values = []
+ self.projections = []
+ self.bcvars = []
+ self.id = None
+ self.to_err = False
+ else:
+ self.__dict__.update(info.__dict__)
+
http://git-wip-us.apache.org/repos/asf/flink/blob/5d37fe14/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
index a3b8d07..b2063eb 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
@@ -16,7 +16,6 @@
# limitations under the License.
################################################################################
from flink.plan.Environment import get_environment
-from flink.plan.Constants import _Fields
from flink.plan.Constants import INT, STRING, BOOL, FLOAT
import sys
@@ -29,28 +28,28 @@ if __name__ == "__main__":
direct_from_source = d1.filter(lambda x:True)
- if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True):
+ if direct_from_source._info.types != ("hello", 4, 3.2, True):
sys.exit("Error deducting type directly from source.")
from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True)
- if from_common_udf._info[_Fields.TYPES] != BOOL:
+ if from_common_udf._info.types != BOOL:
sys.exit("Error deducting type from common udf.")
through_projection = d1.project(3, 2).filter(lambda x:True)
- if through_projection._info[_Fields.TYPES] != (True, 3.2):
+ if through_projection._info.types != (True, 3.2):
sys.exit("Error deducting type through projection.")
through_default_op = d1.cross(d2).filter(lambda x:True)
- if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), "world"):
- sys.exit("Error deducting type through default J/C." +str(through_default_op._info[_Fields.TYPES]))
+ if through_default_op._info.types != (("hello", 4, 3.2, True), "world"):
+ sys.exit("Error deducting type through default J/C." +str(through_default_op._info.types))
through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True)
- if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2):
- sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info[_Fields.TYPES]))
+ if through_prj_op._info.types != (4, "hello", "world", True, 3.2):
+ sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info.types))
env = get_environment()