You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:51 UTC
[incubator-wayang] 30/32: [WAYANG-#8] add PyMapOperator and minnors
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 8b5048fd79f50b1b50c84e483207e1a45ef811ef
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Fri Apr 8 12:01:53 2022 +0200
[WAYANG-#8] add PyMapOperator and minnors
Signed-off-by: bertty <be...@apache.org>
---
python/src/pywy/core/channel.py | 1 +
python/src/pywy/core/plan.py | 4 +-
python/src/pywy/core/translator.py | 2 +-
python/src/pywy/dataquanta.py | 5 ++-
python/src/pywy/graph/graph.py | 8 ++--
python/src/pywy/graph/types.py | 34 +++++++--------
python/src/pywy/operators/base.py | 16 ++++---
python/src/pywy/operators/sink.py | 4 +-
python/src/pywy/operators/unary.py | 7 ----
python/src/pywy/platforms/python/execution.py | 37 +++++++++-------
python/src/pywy/platforms/python/mappings.py | 1 +
.../src/pywy/platforms/python/operator/__init__.py | 4 +-
.../python/operator/py_execution_operator.py | 2 +-
.../platforms/python/operator/py_sink_textfile.py | 12 ++++--
.../pywy/platforms/python/operator/py_unary_map.py | 49 ++++++++++++++++++++++
.../pywy/tests/integration/python_platform_test.py | 31 +++++++++++++-
16 files changed, 155 insertions(+), 62 deletions(-)
diff --git a/python/src/pywy/core/channel.py b/python/src/pywy/core/channel.py
index c10cc09c..e915c98a 100644
--- a/python/src/pywy/core/channel.py
+++ b/python/src/pywy/core/channel.py
@@ -25,3 +25,4 @@ class ChannelDescriptor:
CH_T = TypeVar('CH_T', bound=Channel)
+CHD_T = TypeVar('CHD_T', bound=ChannelDescriptor)
diff --git a/python/src/pywy/core/plan.py b/python/src/pywy/core/plan.py
index 1826f90e..fc8f7360 100644
--- a/python/src/pywy/core/plan.py
+++ b/python/src/pywy/core/plan.py
@@ -35,7 +35,7 @@ class PywyPlan:
)
)
- self.graph.traversal(None, self.graph.starting_nodes, print_plan)
+ self.graph.traversal(self.graph.starting_nodes, print_plan)
def printTuple(self):
def print_plan(current: NodeVec, previous: NodeVec):
@@ -56,4 +56,4 @@ class PywyPlan:
)
)
- self.graph.traversal(None, self.graph.starting_nodes, print_plan)
+ self.graph.traversal(self.graph.starting_nodes, print_plan)
diff --git a/python/src/pywy/core/translator.py b/python/src/pywy/core/translator.py
index 0e0acfcb..72b6f01a 100644
--- a/python/src/pywy/core/translator.py
+++ b/python/src/pywy/core/translator.py
@@ -32,7 +32,7 @@ class Translator:
# TODO not necesary it it 0
current_op.current[1].connect(0, next_op.current[1], 0)
- graph.traversal(None, graph.starting_nodes, translate2plugin)
+ graph.traversal(graph.starting_nodes, translate2plugin)
node = []
for elem in graph.starting_nodes:
diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index 283286e5..956d815e 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -1,6 +1,7 @@
from typing import Set, List, cast
from pywy.core import Translator
+from pywy.operators.base import PO_T
from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableOut, T, In, Out)
from pywy.operators import *
from pywy.core import PywyPlan
@@ -64,7 +65,7 @@ class DataQuanta(GenericTco):
return DataQuanta(self.context, self._connect(FlatmapOperator(f)))
def store_textfile(self: "DataQuanta[In]", path: str):
- last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path)))]
+ last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path, self.operator.outputSlot[0])))]
plan = PywyPlan(self.context.plugins, last)
plug = self.context.plugins.pop()
@@ -73,7 +74,7 @@ class DataQuanta(GenericTco):
plug.get_executor().execute(new_plan)
# TODO add the logic to execute the plan
- def _connect(self, op: PywyOperator, port_op: int = 0) -> PywyOperator:
+ def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator:
self.operator.connect(0, op, port_op)
return op
diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py
index 863e4a6a..c6bbd5b5 100644
--- a/python/src/pywy/graph/graph.py
+++ b/python/src/pywy/graph/graph.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
from pywy.types import T, K
from typing import (Iterable, Dict, Callable, Any, Generic, Optional, List)
@@ -22,7 +24,7 @@ class GraphNode(Generic[K, T]):
if len(adjacent) == 0:
return []
- def wrap(op: T) -> 'GraphNode[K, T]':
+ def wrap(op: T) -> Optional['GraphNode[K, T]'] | None:
if op is None:
return None
if op not in created:
@@ -59,12 +61,12 @@ class WayangGraph(Generic[K, T]):
def traversal(
self,
- origin: GraphNode[K, T],
nodes: Iterable[GraphNode[K, T]],
udf: Callable[[GraphNode[K, T], GraphNode[K, T]], Any],
+ origin: Optional[GraphNode[K, T]] = None,
visit_status: bool = True
):
for node in nodes:
adjacent = node.walk(self.created_nodes)
- self.traversal(node, adjacent, udf, visit_status)
+ self.traversal(adjacent, udf, node, visit_status)
node.visit(origin, udf)
diff --git a/python/src/pywy/graph/types.py b/python/src/pywy/graph/types.py
index bc99efa0..c72876fd 100644
--- a/python/src/pywy/graph/types.py
+++ b/python/src/pywy/graph/types.py
@@ -1,45 +1,45 @@
from typing import (Iterable, List)
from pywy.graph.graph import (GraphNode, WayangGraph)
-from pywy.operators.base import PywyOperator
+from pywy.operators.base import PywyOperator, PO_T
-class NodeOperator(GraphNode[PywyOperator, PywyOperator]):
+class NodeOperator(GraphNode[PO_T, PO_T]):
- def __init__(self, op: PywyOperator):
+ def __init__(self, op: PO_T):
super(NodeOperator, self).__init__(op)
- def get_adjacents(self) -> List[PywyOperator]:
- operator: PywyOperator = self.current
+ def get_adjacents(self) -> List[PO_T]:
+ operator: PO_T = self.current
if operator is None or operator.inputs == 0:
return []
return operator.inputOperator
- def build_node(self, t: PywyOperator) -> 'NodeOperator':
+ def build_node(self, t: PO_T) -> 'NodeOperator':
return NodeOperator(t)
-class WGraphOfOperator(WayangGraph[PywyOperator, NodeOperator]):
+class WGraphOfOperator(WayangGraph[PO_T, NodeOperator]):
- def __init__(self, nodes: Iterable[PywyOperator]):
+ def __init__(self, nodes: Iterable[PO_T]):
super(WGraphOfOperator, self).__init__(nodes)
- def build_node(self, t: PywyOperator) -> NodeOperator:
+ def build_node(self, t: PO_T) -> NodeOperator:
return NodeOperator(t)
-class NodeVec(GraphNode[PywyOperator, List[PywyOperator]]):
+class NodeVec(GraphNode[PO_T, List[PO_T]]):
- def __init__(self, op: PywyOperator):
+ def __init__(self, op: PO_T):
super(NodeVec, self).__init__([op, None])
- def get_adjacents(self) -> List[PywyOperator]:
- operator: PywyOperator = self.current[0]
+ def get_adjacents(self) -> List[PO_T]:
+ operator: PO_T = self.current[0]
if operator is None or operator.inputs == 0:
return []
return operator.inputOperator
- def build_node(self, t: PywyOperator) -> 'NodeVec':
+ def build_node(self, t: PO_T) -> 'NodeVec':
return NodeVec(t)
def __str__(self):
@@ -49,10 +49,10 @@ class NodeVec(GraphNode[PywyOperator, List[PywyOperator]]):
return self.__str__()
-class WGraphOfVec(WayangGraph[PywyOperator, NodeVec]):
+class WGraphOfVec(WayangGraph[PO_T, NodeVec]):
- def __init__(self, nodes: Iterable[PywyOperator]):
+ def __init__(self, nodes: Iterable[PO_T]):
super(WGraphOfVec, self).__init__(nodes)
- def build_node(self, t: PywyOperator) -> NodeVec:
+ def build_node(self, t: PO_T) -> NodeVec:
return NodeVec(t)
diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py
index 3b5cca9e..c8c4b6bf 100644
--- a/python/src/pywy/operators/base.py
+++ b/python/src/pywy/operators/base.py
@@ -1,17 +1,18 @@
from typing import (TypeVar, Optional, List, Set)
-from pywy.core import ChannelDescriptor, Channel
+from pywy.core import ChannelDescriptor
+from pywy.core.channel import CH_T, CHD_T
class PywyOperator:
inputSlot: List[TypeVar]
- inputChannel: List[Channel]
- inputChannelDescriptor: List[ChannelDescriptor]
+ inputChannel: List[CH_T]
+ inputChannelDescriptor: List[CHD_T]
inputOperator: List['PywyOperator']
inputs: int
outputSlot: List[TypeVar]
- outputChannel: List[Channel]
- outputChannelDescriptor: List[ChannelDescriptor]
+ outputChannel: List[CH_T]
+ outputChannelDescriptor: List[CHD_T]
outputOperator: List['PywyOperator']
outputs: int
@@ -54,7 +55,7 @@ class PywyOperator:
self.validate_inputs(inputs)
self.validate_outputs(outputs)
- def connect(self, port: int, that: 'PywyOperator', port_that: int):
+ def connect(self, port: int, that: 'PO_T', port_that: int):
self.outputOperator[port] = that
that.inputOperator[port_that] = self
@@ -86,3 +87,6 @@ class PywyOperator:
def __repr__(self):
return self.__str__()
+
+
+PO_T = TypeVar('PO_T', bound=PywyOperator)
diff --git a/python/src/pywy/operators/sink.py b/python/src/pywy/operators/sink.py
index 6b13b676..1f78a63c 100644
--- a/python/src/pywy/operators/sink.py
+++ b/python/src/pywy/operators/sink.py
@@ -26,8 +26,8 @@ class TextFileSink(SinkUnaryOperator):
path: str
- def __init__(self, path: str):
- super().__init__('TextFile')
+ def __init__(self, path: str, input_type: GenericTco):
+ super().__init__('TextFile', input_type)
self.path = path
def __str__(self):
diff --git a/python/src/pywy/operators/unary.py b/python/src/pywy/operators/unary.py
index cfd585f1..03d6f118 100644
--- a/python/src/pywy/operators/unary.py
+++ b/python/src/pywy/operators/unary.py
@@ -52,13 +52,6 @@ class MapOperator(UnaryToUnaryOperator):
super().__init__("Map", types[0], types[1])
self.function = function
- # TODO remove wrapper
- def getWrapper(self):
- udf = self.function
- def func(iterator):
- return map(udf, iterator)
- return func
-
def __str__(self):
return super().__str__()
diff --git a/python/src/pywy/platforms/python/execution.py b/python/src/pywy/platforms/python/execution.py
index 7f8803eb..bab62ecc 100644
--- a/python/src/pywy/platforms/python/execution.py
+++ b/python/src/pywy/platforms/python/execution.py
@@ -1,9 +1,8 @@
-from typing import List
-
from pywy.graph.types import WGraphOfOperator, NodeOperator
-from pywy.core import Channel
+from pywy.core import ChannelDescriptor
from pywy.core import Executor
from pywy.core import PywyPlan
+from pywy.platforms.python.channels import PY_ITERATOR_CHANNEL_DESCRIPTOR
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
@@ -16,7 +15,10 @@ class PyExecutor(Executor):
pywyPlan: PywyPlan = plan
graph = WGraphOfOperator(pywyPlan.sinks)
- def exec(op_current: NodeOperator, op_next: NodeOperator):
+ # TODO get this information by a configuration and ideally by the context
+ descriptor_default: ChannelDescriptor = PY_ITERATOR_CHANNEL_DESCRIPTOR
+
+ def execute(op_current: NodeOperator, op_next: NodeOperator):
if op_current is None:
return
@@ -42,21 +44,26 @@ class PyExecutor(Executor):
inputs
)
)
+
if len(intersect) > 1:
- raise Exception(
- "The interaction between the operator (A) {} and (B) {}, "
- "can't be decided because are several channel availables {}".format(
- py_current,
- py_next,
- intersect
+ if descriptor_default is None:
+ raise Exception(
+ "The interaction between the operator (A) {} and (B) {}, "
+ "can't be decided because are several channel availables {}".format(
+ py_current,
+ py_next,
+ intersect
+ )
)
- )
- #TODO validate if is valite for several output
- py_current.outputChannel: List[Channel] = [intersect.pop().create_instance()]
+ descriptor = descriptor_default
+ else:
+ descriptor = intersect.pop()
+
+ # TODO validate if is valite for several output
+ py_current.outputChannel[0] = descriptor.create_instance()
py_current.execute(py_current.inputChannel, py_current.outputChannel)
py_next.inputChannel = py_current.outputChannel
-
- graph.traversal(None, graph.starting_nodes, exec)
+ graph.traversal(graph.starting_nodes, execute)
diff --git a/python/src/pywy/platforms/python/mappings.py b/python/src/pywy/platforms/python/mappings.py
index 9adc062a..50a6ddeb 100644
--- a/python/src/pywy/platforms/python/mappings.py
+++ b/python/src/pywy/platforms/python/mappings.py
@@ -7,4 +7,5 @@ PYWY_OPERATOR_MAPPINGS = Mapping()
PYWY_OPERATOR_MAPPINGS.add_mapping(PyFilterOperator())
PYWY_OPERATOR_MAPPINGS.add_mapping(PyTextFileSourceOperator())
PYWY_OPERATOR_MAPPINGS.add_mapping(PyTextFileSinkOperator())
+PYWY_OPERATOR_MAPPINGS.add_mapping(PyMapOperator())
diff --git a/python/src/pywy/platforms/python/operator/__init__.py b/python/src/pywy/platforms/python/operator/__init__.py
index ef9d94b8..51b6f409 100644
--- a/python/src/pywy/platforms/python/operator/__init__.py
+++ b/python/src/pywy/platforms/python/operator/__init__.py
@@ -1,5 +1,6 @@
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
from pywy.platforms.python.operator.py_unary_filter import PyFilterOperator
+from pywy.platforms.python.operator.py_unary_map import PyMapOperator
from pywy.platforms.python.operator.py_source_textfile import PyTextFileSourceOperator
from pywy.platforms.python.operator.py_sink_textfile import PyTextFileSinkOperator
@@ -7,5 +8,6 @@ __ALL__ = [
PyExecutionOperator,
PyFilterOperator,
PyTextFileSourceOperator,
- PyTextFileSinkOperator
+ PyTextFileSinkOperator,
+ PyMapOperator,
]
diff --git a/python/src/pywy/platforms/python/operator/py_execution_operator.py b/python/src/pywy/platforms/python/operator/py_execution_operator.py
index 9b5f1a75..c63e88b5 100644
--- a/python/src/pywy/platforms/python/operator/py_execution_operator.py
+++ b/python/src/pywy/platforms/python/operator/py_execution_operator.py
@@ -9,5 +9,5 @@ class PyExecutionOperator(PywyOperator):
def prefix(self) -> str:
return 'Py'
- def execute(self, inputs: List[Type[CH_T]], output: List[Type[CH_T]]):
+ def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]):
pass
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index ab5f5af7..801387cd 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -14,8 +14,8 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
def __init__(self, origin: TextFileSink = None):
path = None if origin is None else origin.path
- super().__init__(path)
- pass
+ type_class = None if origin is None else origin.inputSlot[0]
+ super().__init__(path, type_class)
def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
self.validate_channels(inputs, outputs)
@@ -23,8 +23,12 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
file = open(self.path, 'w')
py_in_iter_channel: PyIteratorChannel = inputs[0]
iterable = py_in_iter_channel.provide_iterable()
- for element in iterable:
- file.write(str(element))
+ if self.inputSlot[0] == str:
+ for element in iterable:
+ file.write(element)
+ else:
+ for element in iterable:
+ file.write("{}\n".format(str(element)))
file.close()
else:
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py
new file mode 100644
index 00000000..bf6ef3f0
--- /dev/null
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -0,0 +1,49 @@
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T
+from pywy.operators.unary import MapOperator
+from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.python.channels import (
+ ChannelDescriptor,
+ PyIteratorChannel,
+ PY_ITERATOR_CHANNEL_DESCRIPTOR,
+ PY_CALLABLE_CHANNEL_DESCRIPTOR,
+ PyCallableChannel
+ )
+
+
+class PyMapOperator(MapOperator, PyExecutionOperator):
+
+ def __init__(self, origin: MapOperator = None):
+ function = None if origin is None else origin.function
+ super().__init__(function)
+ pass
+
+ def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+ self.validate_channels(inputs, outputs)
+ udf = self.function
+ if isinstance(inputs[0], PyIteratorChannel):
+ py_in_iter_channel: PyIteratorChannel = inputs[0]
+ py_out_iter_channel: PyIteratorChannel = outputs[0]
+ py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable()))
+ elif isinstance(inputs[0], PyCallableChannel):
+ py_in_call_channel: PyCallableChannel = inputs[0]
+ py_out_call_channel: PyCallableChannel = outputs[0]
+
+ def func(iterator):
+ return map(udf, iterator)
+
+ py_out_call_channel.accept_callable(
+ PyCallableChannel.concatenate(
+ func,
+ py_in_call_channel.provide_callable()
+ )
+ )
+ else:
+ raise Exception("Channel Type does not supported")
+
+ def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+
+ def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/tests/integration/python_platform_test.py b/python/src/pywy/tests/integration/python_platform_test.py
index 64a7510d..d2ec4f22 100644
--- a/python/src/pywy/tests/integration/python_platform_test.py
+++ b/python/src/pywy/tests/integration/python_platform_test.py
@@ -1,7 +1,6 @@
import os
import unittest
import tempfile
-from os import fdopen
from typing import List
from pywy.config import RC_TEST_DIR as ROOT
@@ -42,3 +41,33 @@ class TestIntegrationPythonPlatform(unittest.TestCase):
self.assertEqual(selectivity, elements)
self.assertEqual(lines_filter, lines_platform)
+
+ def test_dummy_map(self):
+ def pre(a: str) -> bool:
+ return 'six' in a
+
+ def convert(a: str) -> int:
+ return len(a)
+
+ fd, path_tmp = tempfile.mkstemp()
+
+ WayangContext() \
+ .register(PYTHON) \
+ .textfile(self.file_10e0) \
+ .filter(pre) \
+ .map(convert) \
+ .store_textfile(path_tmp)
+
+ lines_filter: List[int]
+ with open(self.file_10e0, 'r') as f:
+ lines_filter = list(map(convert, filter(pre, f.readlines())))
+ selectivity = len(list(lines_filter))
+
+ lines_platform: List[int]
+ with open(path_tmp, 'r') as fp:
+ lines_platform = list(map(lambda x: int(x), fp.readlines()))
+ elements = len(lines_platform)
+ os.remove(path_tmp)
+
+ self.assertEqual(selectivity, elements)
+ self.assertEqual(lines_filter, lines_platform)