You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:51 UTC

[incubator-wayang] 30/32: [WAYANG-#8] add PyMapOperator and minnors

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 8b5048fd79f50b1b50c84e483207e1a45ef811ef
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Fri Apr 8 12:01:53 2022 +0200

    [WAYANG-#8] add PyMapOperator and minnors
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/core/channel.py                    |  1 +
 python/src/pywy/core/plan.py                       |  4 +-
 python/src/pywy/core/translator.py                 |  2 +-
 python/src/pywy/dataquanta.py                      |  5 ++-
 python/src/pywy/graph/graph.py                     |  8 ++--
 python/src/pywy/graph/types.py                     | 34 +++++++--------
 python/src/pywy/operators/base.py                  | 16 ++++---
 python/src/pywy/operators/sink.py                  |  4 +-
 python/src/pywy/operators/unary.py                 |  7 ----
 python/src/pywy/platforms/python/execution.py      | 37 +++++++++-------
 python/src/pywy/platforms/python/mappings.py       |  1 +
 .../src/pywy/platforms/python/operator/__init__.py |  4 +-
 .../python/operator/py_execution_operator.py       |  2 +-
 .../platforms/python/operator/py_sink_textfile.py  | 12 ++++--
 .../pywy/platforms/python/operator/py_unary_map.py | 49 ++++++++++++++++++++++
 .../pywy/tests/integration/python_platform_test.py | 31 +++++++++++++-
 16 files changed, 155 insertions(+), 62 deletions(-)

diff --git a/python/src/pywy/core/channel.py b/python/src/pywy/core/channel.py
index c10cc09c..e915c98a 100644
--- a/python/src/pywy/core/channel.py
+++ b/python/src/pywy/core/channel.py
@@ -25,3 +25,4 @@ class ChannelDescriptor:
 
 
 CH_T = TypeVar('CH_T', bound=Channel)
+CHD_T = TypeVar('CHD_T', bound=ChannelDescriptor)
diff --git a/python/src/pywy/core/plan.py b/python/src/pywy/core/plan.py
index 1826f90e..fc8f7360 100644
--- a/python/src/pywy/core/plan.py
+++ b/python/src/pywy/core/plan.py
@@ -35,7 +35,7 @@ class PywyPlan:
                 )
             )
 
-        self.graph.traversal(None, self.graph.starting_nodes, print_plan)
+        self.graph.traversal(self.graph.starting_nodes, print_plan)
 
     def printTuple(self):
         def print_plan(current: NodeVec, previous: NodeVec):
@@ -56,4 +56,4 @@ class PywyPlan:
                 )
             )
 
-        self.graph.traversal(None, self.graph.starting_nodes, print_plan)
+        self.graph.traversal(self.graph.starting_nodes, print_plan)
diff --git a/python/src/pywy/core/translator.py b/python/src/pywy/core/translator.py
index 0e0acfcb..72b6f01a 100644
--- a/python/src/pywy/core/translator.py
+++ b/python/src/pywy/core/translator.py
@@ -32,7 +32,7 @@ class Translator:
             # TODO not necesary it it 0
             current_op.current[1].connect(0, next_op.current[1], 0)
 
-        graph.traversal(None, graph.starting_nodes, translate2plugin)
+        graph.traversal(graph.starting_nodes, translate2plugin)
 
         node = []
         for elem in graph.starting_nodes:
diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index 283286e5..956d815e 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -1,6 +1,7 @@
 from typing import Set, List, cast
 
 from pywy.core import Translator
+from pywy.operators.base import PO_T
 from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableOut, T, In, Out)
 from pywy.operators import *
 from pywy.core import PywyPlan
@@ -64,7 +65,7 @@ class DataQuanta(GenericTco):
         return DataQuanta(self.context, self._connect(FlatmapOperator(f)))
 
     def store_textfile(self: "DataQuanta[In]", path: str):
-        last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path)))]
+        last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path, self.operator.outputSlot[0])))]
         plan = PywyPlan(self.context.plugins, last)
 
         plug = self.context.plugins.pop()
@@ -73,7 +74,7 @@ class DataQuanta(GenericTco):
         plug.get_executor().execute(new_plan)
         # TODO add the logic to execute the plan
 
-    def _connect(self, op: PywyOperator, port_op: int = 0) -> PywyOperator:
+    def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator:
         self.operator.connect(0, op, port_op)
         return op
 
diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py
index 863e4a6a..c6bbd5b5 100644
--- a/python/src/pywy/graph/graph.py
+++ b/python/src/pywy/graph/graph.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
 from pywy.types import T, K
 from typing import (Iterable, Dict, Callable, Any, Generic, Optional, List)
 
@@ -22,7 +24,7 @@ class GraphNode(Generic[K, T]):
         if len(adjacent) == 0:
             return []
 
-        def wrap(op: T) -> 'GraphNode[K, T]':
+        def wrap(op: T) -> Optional['GraphNode[K, T]'] | None:
             if op is None:
                 return None
             if op not in created:
@@ -59,12 +61,12 @@ class WayangGraph(Generic[K, T]):
 
     def traversal(
             self,
-            origin: GraphNode[K, T],
             nodes: Iterable[GraphNode[K, T]],
             udf: Callable[[GraphNode[K, T], GraphNode[K, T]], Any],
+            origin: Optional[GraphNode[K, T]] = None,
             visit_status: bool = True
     ):
         for node in nodes:
             adjacent = node.walk(self.created_nodes)
-            self.traversal(node, adjacent, udf, visit_status)
+            self.traversal(adjacent, udf, node, visit_status)
             node.visit(origin, udf)
diff --git a/python/src/pywy/graph/types.py b/python/src/pywy/graph/types.py
index bc99efa0..c72876fd 100644
--- a/python/src/pywy/graph/types.py
+++ b/python/src/pywy/graph/types.py
@@ -1,45 +1,45 @@
 from typing import (Iterable, List)
 
 from pywy.graph.graph import (GraphNode, WayangGraph)
-from pywy.operators.base import PywyOperator
+from pywy.operators.base import PywyOperator, PO_T
 
 
-class NodeOperator(GraphNode[PywyOperator, PywyOperator]):
+class NodeOperator(GraphNode[PO_T, PO_T]):
 
-    def __init__(self, op: PywyOperator):
+    def __init__(self, op: PO_T):
         super(NodeOperator, self).__init__(op)
 
-    def get_adjacents(self) -> List[PywyOperator]:
-        operator: PywyOperator = self.current
+    def get_adjacents(self) -> List[PO_T]:
+        operator: PO_T = self.current
         if operator is None or operator.inputs == 0:
             return []
         return operator.inputOperator
 
-    def build_node(self, t: PywyOperator) -> 'NodeOperator':
+    def build_node(self, t: PO_T) -> 'NodeOperator':
         return NodeOperator(t)
 
 
-class WGraphOfOperator(WayangGraph[PywyOperator, NodeOperator]):
+class WGraphOfOperator(WayangGraph[PO_T, NodeOperator]):
 
-    def __init__(self, nodes: Iterable[PywyOperator]):
+    def __init__(self, nodes: Iterable[PO_T]):
         super(WGraphOfOperator, self).__init__(nodes)
 
-    def build_node(self, t: PywyOperator) -> NodeOperator:
+    def build_node(self, t: PO_T) -> NodeOperator:
         return NodeOperator(t)
 
 
-class NodeVec(GraphNode[PywyOperator, List[PywyOperator]]):
+class NodeVec(GraphNode[PO_T, List[PO_T]]):
 
-    def __init__(self, op: PywyOperator):
+    def __init__(self, op: PO_T):
         super(NodeVec, self).__init__([op, None])
 
-    def get_adjacents(self) -> List[PywyOperator]:
-        operator: PywyOperator = self.current[0]
+    def get_adjacents(self) -> List[PO_T]:
+        operator: PO_T = self.current[0]
         if operator is None or operator.inputs == 0:
             return []
         return operator.inputOperator
 
-    def build_node(self, t: PywyOperator) -> 'NodeVec':
+    def build_node(self, t: PO_T) -> 'NodeVec':
         return NodeVec(t)
 
     def __str__(self):
@@ -49,10 +49,10 @@ class NodeVec(GraphNode[PywyOperator, List[PywyOperator]]):
         return self.__str__()
 
 
-class WGraphOfVec(WayangGraph[PywyOperator, NodeVec]):
+class WGraphOfVec(WayangGraph[PO_T, NodeVec]):
 
-    def __init__(self, nodes: Iterable[PywyOperator]):
+    def __init__(self, nodes: Iterable[PO_T]):
         super(WGraphOfVec, self).__init__(nodes)
 
-    def build_node(self, t: PywyOperator) -> NodeVec:
+    def build_node(self, t: PO_T) -> NodeVec:
         return NodeVec(t)
diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py
index 3b5cca9e..c8c4b6bf 100644
--- a/python/src/pywy/operators/base.py
+++ b/python/src/pywy/operators/base.py
@@ -1,17 +1,18 @@
 from typing import (TypeVar, Optional, List, Set)
-from pywy.core import ChannelDescriptor, Channel
+from pywy.core import ChannelDescriptor
+from pywy.core.channel import CH_T, CHD_T
 
 
 class PywyOperator:
 
     inputSlot: List[TypeVar]
-    inputChannel: List[Channel]
-    inputChannelDescriptor: List[ChannelDescriptor]
+    inputChannel: List[CH_T]
+    inputChannelDescriptor: List[CHD_T]
     inputOperator: List['PywyOperator']
     inputs: int
     outputSlot: List[TypeVar]
-    outputChannel: List[Channel]
-    outputChannelDescriptor: List[ChannelDescriptor]
+    outputChannel: List[CH_T]
+    outputChannelDescriptor: List[CHD_T]
     outputOperator: List['PywyOperator']
     outputs: int
 
@@ -54,7 +55,7 @@ class PywyOperator:
         self.validate_inputs(inputs)
         self.validate_outputs(outputs)
 
-    def connect(self, port: int, that: 'PywyOperator', port_that: int):
+    def connect(self, port: int, that: 'PO_T', port_that: int):
         self.outputOperator[port] = that
         that.inputOperator[port_that] = self
 
@@ -86,3 +87,6 @@ class PywyOperator:
 
     def __repr__(self):
         return self.__str__()
+
+
+PO_T = TypeVar('PO_T', bound=PywyOperator)
diff --git a/python/src/pywy/operators/sink.py b/python/src/pywy/operators/sink.py
index 6b13b676..1f78a63c 100644
--- a/python/src/pywy/operators/sink.py
+++ b/python/src/pywy/operators/sink.py
@@ -26,8 +26,8 @@ class TextFileSink(SinkUnaryOperator):
 
     path: str
 
-    def __init__(self, path: str):
-        super().__init__('TextFile')
+    def __init__(self, path: str, input_type: GenericTco):
+        super().__init__('TextFile', input_type)
         self.path = path
 
     def __str__(self):
diff --git a/python/src/pywy/operators/unary.py b/python/src/pywy/operators/unary.py
index cfd585f1..03d6f118 100644
--- a/python/src/pywy/operators/unary.py
+++ b/python/src/pywy/operators/unary.py
@@ -52,13 +52,6 @@ class MapOperator(UnaryToUnaryOperator):
         super().__init__("Map", types[0], types[1])
         self.function = function
 
-    # TODO remove wrapper
-    def getWrapper(self):
-        udf = self.function
-        def func(iterator):
-            return map(udf, iterator)
-        return func
-
     def __str__(self):
         return super().__str__()
 
diff --git a/python/src/pywy/platforms/python/execution.py b/python/src/pywy/platforms/python/execution.py
index 7f8803eb..bab62ecc 100644
--- a/python/src/pywy/platforms/python/execution.py
+++ b/python/src/pywy/platforms/python/execution.py
@@ -1,9 +1,8 @@
-from typing import List
-
 from pywy.graph.types import WGraphOfOperator, NodeOperator
-from pywy.core import Channel
+from pywy.core import ChannelDescriptor
 from pywy.core import Executor
 from pywy.core import PywyPlan
+from pywy.platforms.python.channels import PY_ITERATOR_CHANNEL_DESCRIPTOR
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 
 
@@ -16,7 +15,10 @@ class PyExecutor(Executor):
         pywyPlan: PywyPlan = plan
         graph = WGraphOfOperator(pywyPlan.sinks)
 
-        def exec(op_current: NodeOperator, op_next: NodeOperator):
+        # TODO get this information by a configuration and ideally by the context
+        descriptor_default: ChannelDescriptor = PY_ITERATOR_CHANNEL_DESCRIPTOR
+
+        def execute(op_current: NodeOperator, op_next: NodeOperator):
             if op_current is None:
                 return
 
@@ -42,21 +44,26 @@ class PyExecutor(Executor):
                         inputs
                     )
                 )
+
             if len(intersect) > 1:
-                raise Exception(
-                    "The interaction between the operator (A) {} and (B) {}, "
-                    "can't be decided because are several channel availables {}".format(
-                        py_current,
-                        py_next,
-                        intersect
+                if descriptor_default is None:
+                    raise Exception(
+                        "The interaction between the operator (A) {} and (B) {}, "
+                        "can't be decided because are several channel availables {}".format(
+                            py_current,
+                            py_next,
+                            intersect
+                        )
                     )
-                )
-            #TODO validate if is valite for several output
-            py_current.outputChannel: List[Channel] = [intersect.pop().create_instance()]
+                descriptor = descriptor_default
+            else:
+                descriptor = intersect.pop()
+
+            # TODO validate if is valite for several output
+            py_current.outputChannel[0] = descriptor.create_instance()
 
             py_current.execute(py_current.inputChannel, py_current.outputChannel)
 
             py_next.inputChannel = py_current.outputChannel
 
-
-        graph.traversal(None, graph.starting_nodes, exec)
+        graph.traversal(graph.starting_nodes, execute)
diff --git a/python/src/pywy/platforms/python/mappings.py b/python/src/pywy/platforms/python/mappings.py
index 9adc062a..50a6ddeb 100644
--- a/python/src/pywy/platforms/python/mappings.py
+++ b/python/src/pywy/platforms/python/mappings.py
@@ -7,4 +7,5 @@ PYWY_OPERATOR_MAPPINGS = Mapping()
 PYWY_OPERATOR_MAPPINGS.add_mapping(PyFilterOperator())
 PYWY_OPERATOR_MAPPINGS.add_mapping(PyTextFileSourceOperator())
 PYWY_OPERATOR_MAPPINGS.add_mapping(PyTextFileSinkOperator())
+PYWY_OPERATOR_MAPPINGS.add_mapping(PyMapOperator())
 
diff --git a/python/src/pywy/platforms/python/operator/__init__.py b/python/src/pywy/platforms/python/operator/__init__.py
index ef9d94b8..51b6f409 100644
--- a/python/src/pywy/platforms/python/operator/__init__.py
+++ b/python/src/pywy/platforms/python/operator/__init__.py
@@ -1,5 +1,6 @@
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.operator.py_unary_filter import PyFilterOperator
+from pywy.platforms.python.operator.py_unary_map import PyMapOperator
 from pywy.platforms.python.operator.py_source_textfile import PyTextFileSourceOperator
 from pywy.platforms.python.operator.py_sink_textfile import PyTextFileSinkOperator
 
@@ -7,5 +8,6 @@ __ALL__ = [
     PyExecutionOperator,
     PyFilterOperator,
     PyTextFileSourceOperator,
-    PyTextFileSinkOperator
+    PyTextFileSinkOperator,
+    PyMapOperator,
 ]
diff --git a/python/src/pywy/platforms/python/operator/py_execution_operator.py b/python/src/pywy/platforms/python/operator/py_execution_operator.py
index 9b5f1a75..c63e88b5 100644
--- a/python/src/pywy/platforms/python/operator/py_execution_operator.py
+++ b/python/src/pywy/platforms/python/operator/py_execution_operator.py
@@ -9,5 +9,5 @@ class PyExecutionOperator(PywyOperator):
     def prefix(self) -> str:
         return 'Py'
 
-    def execute(self, inputs: List[Type[CH_T]], output: List[Type[CH_T]]):
+    def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]):
         pass
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index ab5f5af7..801387cd 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -14,8 +14,8 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
 
     def __init__(self, origin: TextFileSink = None):
         path = None if origin is None else origin.path
-        super().__init__(path)
-        pass
+        type_class = None if origin is None else origin.inputSlot[0]
+        super().__init__(path, type_class)
 
     def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
@@ -23,8 +23,12 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
             file = open(self.path, 'w')
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             iterable = py_in_iter_channel.provide_iterable()
-            for element in iterable:
-                file.write(str(element))
+            if self.inputSlot[0] == str:
+                for element in iterable:
+                    file.write(element)
+            else:
+                for element in iterable:
+                    file.write("{}\n".format(str(element)))
             file.close()
 
         else:
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py
new file mode 100644
index 00000000..bf6ef3f0
--- /dev/null
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -0,0 +1,49 @@
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T
+from pywy.operators.unary import MapOperator
+from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.python.channels import (
+                                                ChannelDescriptor,
+                                                PyIteratorChannel,
+                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
+                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
+                                                PyCallableChannel
+                                            )
+
+
+class PyMapOperator(MapOperator, PyExecutionOperator):
+
+    def __init__(self, origin: MapOperator = None):
+        function = None if origin is None else origin.function
+        super().__init__(function)
+        pass
+
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+        self.validate_channels(inputs, outputs)
+        udf = self.function
+        if isinstance(inputs[0], PyIteratorChannel):
+            py_in_iter_channel: PyIteratorChannel = inputs[0]
+            py_out_iter_channel: PyIteratorChannel = outputs[0]
+            py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable()))
+        elif isinstance(inputs[0], PyCallableChannel):
+            py_in_call_channel: PyCallableChannel = inputs[0]
+            py_out_call_channel: PyCallableChannel = outputs[0]
+
+            def func(iterator):
+                return map(udf, iterator)
+
+            py_out_call_channel.accept_callable(
+                PyCallableChannel.concatenate(
+                    func,
+                    py_in_call_channel.provide_callable()
+                )
+            )
+        else:
+            raise Exception("Channel Type does not supported")
+
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/tests/integration/python_platform_test.py b/python/src/pywy/tests/integration/python_platform_test.py
index 64a7510d..d2ec4f22 100644
--- a/python/src/pywy/tests/integration/python_platform_test.py
+++ b/python/src/pywy/tests/integration/python_platform_test.py
@@ -1,7 +1,6 @@
 import os
 import unittest
 import tempfile
-from os import fdopen
 from typing import List
 
 from pywy.config import RC_TEST_DIR as ROOT
@@ -42,3 +41,33 @@ class TestIntegrationPythonPlatform(unittest.TestCase):
 
         self.assertEqual(selectivity, elements)
         self.assertEqual(lines_filter, lines_platform)
+
+    def test_dummy_map(self):
+        def pre(a: str) -> bool:
+            return 'six' in a
+
+        def convert(a: str) -> int:
+            return len(a)
+
+        fd, path_tmp = tempfile.mkstemp()
+
+        WayangContext() \
+            .register(PYTHON) \
+            .textfile(self.file_10e0) \
+            .filter(pre) \
+            .map(convert) \
+            .store_textfile(path_tmp)
+
+        lines_filter: List[int]
+        with open(self.file_10e0, 'r') as f:
+            lines_filter = list(map(convert, filter(pre, f.readlines())))
+            selectivity = len(list(lines_filter))
+
+        lines_platform: List[int]
+        with open(path_tmp, 'r') as fp:
+            lines_platform = list(map(lambda x: int(x), fp.readlines()))
+            elements = len(lines_platform)
+        os.remove(path_tmp)
+
+        self.assertEqual(selectivity, elements)
+        self.assertEqual(lines_filter, lines_platform)