You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:38 UTC

[incubator-wayang] 17/32: [WAYANG-#8] Executor in python platformb

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f32d04a6e06ac0c9eff5e4a2d94d264700faa3c5
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 21:39:30 2022 +0200

    [WAYANG-#8] Executor in python platformb
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/dataquanta.py                      |  6 ++-
 python/src/pywy/platforms/basic/executor.py        |  9 ++++
 python/src/pywy/platforms/basic/plugin.py          |  4 ++
 python/src/pywy/platforms/basic/translator.py      | 56 ++++++++++---------
 .../pywy/platforms/python/execution/executor.py    | 62 ++++++++++++++++++++++
 ...ExecutionOperator.py => PyExecutionOperator.py} |  2 +-
 .../platforms/python/operators/PyFilterOperator.py |  4 +-
 .../python/operators/PyTextFileSinkOperator.py     |  4 +-
 .../python/operators/PyTextFileSourceOperator.py   |  4 +-
 .../pywy/platforms/python/operators/__init__.py    |  4 +-
 python/src/pywy/platforms/python/plugin/plugin.py  |  7 ++-
 python/src/pywy/wayangplan/base.py                 | 10 ++--
 12 files changed, 131 insertions(+), 41 deletions(-)

diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index d063cbda..14019423 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -62,8 +62,10 @@ class DataQuanta(GenericTco):
         last = self.__connect(TextFileSink(path))
         plan = PywyPlan(self.context.plugins, [last])
 
-        trs: Translator =  Translator(self.context.plugins.pop(), plan)
-        trs.translate()
+        plug = self.context.plugins.pop()
+        trs: Translator =  Translator(plug, plan)
+        new_plan = trs.translate()
+        plug.get_executor().execute(new_plan)
         # TODO add the logic to execute the plan
 
     def __connect(self, op:PywyOperator, port_op: int = 0) -> PywyOperator:
diff --git a/python/src/pywy/platforms/basic/executor.py b/python/src/pywy/platforms/basic/executor.py
new file mode 100644
index 00000000..31fcb2c7
--- /dev/null
+++ b/python/src/pywy/platforms/basic/executor.py
@@ -0,0 +1,9 @@
+
+
+class Executor:
+
+    def __init__(self):
+        pass
+
+    def execute(self, plan):
+        pass
\ No newline at end of file
diff --git a/python/src/pywy/platforms/basic/plugin.py b/python/src/pywy/platforms/basic/plugin.py
index 1f156d9d..838d88a4 100644
--- a/python/src/pywy/platforms/basic/plugin.py
+++ b/python/src/pywy/platforms/basic/plugin.py
@@ -1,5 +1,6 @@
 from typing import Set
 
+from pywy.platforms.basic.executor import Executor
 from pywy.platforms.basic.platform import Platform
 from pywy.platforms.basic.mapping import Mapping
 
@@ -22,6 +23,9 @@ class Plugin:
     def get_mappings(self) -> Mapping:
         return self.mappings
 
+    def get_executor(self) -> Executor:
+        pass
+
     def __str__(self):
         return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings))
 
diff --git a/python/src/pywy/platforms/basic/translator.py b/python/src/pywy/platforms/basic/translator.py
index d7d545e2..6031f7a2 100644
--- a/python/src/pywy/platforms/basic/translator.py
+++ b/python/src/pywy/platforms/basic/translator.py
@@ -15,41 +15,45 @@ class Translator:
     def translate(self):
         mappings:Mapping = self.plugin.get_mappings()
         graph = WGraphOfVec(self.plan.sinks)
-        def translate2plugin(current: NodeVec, previous: NodeVec):
+        def translate2plugin(current: NodeVec, next: NodeVec):
             if current is None:
                 return
 
             if current.current[1] is None:
                 current.current[1] = mappings.get_instanceof(current.current[0])
 
-            if previous is None:
+            if next is None:
                 return
-            if previous.current[1] is None:
-                previous.current[1] = mappings.get_instanceof(previous.current[0])
+            if next.current[1] is None:
+                next.current[1] = mappings.get_instanceof(next.current[0])
 
             # TODO not necesary it it 0
-            current.current[1].connect(0, previous.current[1], 0)
+            current.current[1].connect(0, next.current[1], 0)
 
         graph.traversal(None, graph.starting_nodes, translate2plugin)
 
-        def print_plan(current: NodeVec, previous: NodeVec):
-            if current is None:
-                print("this is source")
-                print(previous.current)
-                return
-            if previous is None:
-                print("this is sink")
-                print(current.current)
-                return
-
-            print(
-                "############\n{}\n@@@@@ => previous is\n{}\n############\n"
-                    .format(
-                        current.current,
-                        previous.current
-                     )
-            )
-
-        graph.traversal(None, graph.starting_nodes, print_plan, False)
-        print("here")
-
+        # def print_plan(current: NodeVec, previous: NodeVec):
+        #     if current is None:
+        #         print("this is source")
+        #         print(previous.current)
+        #         return
+        #     if previous is None:
+        #         print("this is sink")
+        #         print(current.current)
+        #         return
+        #
+        #     print(
+        #         "############\n{}\n@@@@@ => previous is\n{}\n############\n"
+        #             .format(
+        #                 current.current,
+        #                 previous.current
+        #              )
+        #     )
+        #
+        # graph.traversal(None, graph.starting_nodes, print_plan, False)
+
+        node = []
+        for elem in graph.starting_nodes:
+            node.append(elem.current[1])
+
+        return PywyPlan(self.plugin, node)
\ No newline at end of file
diff --git a/python/src/pywy/platforms/python/execution/executor.py b/python/src/pywy/platforms/python/execution/executor.py
new file mode 100644
index 00000000..fa17ae8f
--- /dev/null
+++ b/python/src/pywy/platforms/python/execution/executor.py
@@ -0,0 +1,62 @@
+from typing import List
+
+from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator
+from pywy.platforms.basic.channel import Channel
+from pywy.platforms.basic.executor import Executor
+from pywy.platforms.basic.plan import PywyPlan
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
+
+
+class PyExecutor(Executor):
+
+    def __init__(self):
+        super(PyExecutor, self).__init__()
+
+    def execute(self, plan):
+        pywyPlan: PywyPlan = plan
+        graph = WGraphOfOperator(pywyPlan.sinks)
+
+        def exec(current: NodeOperator, next: NodeOperator):
+            if current is None:
+                return
+
+            py_current: PyExecutionOperator = current.current
+            if py_current.outputs == 0:
+                py_current.execute(py_current.inputChannel, [])
+                return
+
+            if next is None:
+                return
+            py_next: PyExecutionOperator = next.current
+            outputs = py_current.get_output_channeldescriptors()
+            inputs = py_next.get_input_channeldescriptors()
+
+            intersect = outputs.intersection(inputs)
+            if len(intersect) == 0:
+                raise Exception(
+                    "The operator(A) {} can't connect with (B) {}, because the output of (A) is {} and the input of (B) is {}"
+                     .format(
+                        py_current,
+                        py_next,
+                        outputs,
+                        inputs
+                    )
+                )
+            if len(intersect) > 1:
+                raise Exception(
+                    "The interaction between the operator (A) {} and (B) {}, can't be decided because are several channel availables {}"
+                     .format(
+                        py_current,
+                        py_next,
+                        intersect
+                    )
+                )
+            #TODO validate if is valite for several output
+            py_current.outputChannel: List[Channel] = [intersect.pop().create_instance()]
+
+            py_current.execute(py_current.inputChannel, py_current.outputChannel)
+
+            py_next.inputChannel = py_current.outputChannel
+
+
+        graph.traversal(None, graph.starting_nodes, exec)
\ No newline at end of file
diff --git a/python/src/pywy/platforms/python/operators/PythonExecutionOperator.py b/python/src/pywy/platforms/python/operators/PyExecutionOperator.py
similarity index 78%
rename from python/src/pywy/platforms/python/operators/PythonExecutionOperator.py
rename to python/src/pywy/platforms/python/operators/PyExecutionOperator.py
index 2db44f03..a9d4ebd5 100644
--- a/python/src/pywy/platforms/python/operators/PythonExecutionOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyExecutionOperator.py
@@ -1,7 +1,7 @@
 from pywy.wayangplan.base import PywyOperator
 from pywy.platforms.python.channels import Channel
 
-class PythonExecutionOperator(PywyOperator):
+class PyExecutionOperator(PywyOperator):
 
     def prefix(self) -> str:
         return 'Py'
diff --git a/python/src/pywy/platforms/python/operators/PyFilterOperator.py b/python/src/pywy/platforms/python/operators/PyFilterOperator.py
index 7c0bbf7e..7d6a503c 100644
--- a/python/src/pywy/platforms/python/operators/PyFilterOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyFilterOperator.py
@@ -1,6 +1,6 @@
 from typing import Set
 from pywy.wayangplan.unary import FilterOperator
-from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
 from pywy.platforms.python.channels import (
                                                 Channel,
                                                 ChannelDescriptor,
@@ -11,7 +11,7 @@ from pywy.platforms.python.channels import (
                                             )
 
 
-class PyFilterOperator(FilterOperator, PythonExecutionOperator):
+class PyFilterOperator(FilterOperator, PyExecutionOperator):
 
     def __init__(self, origin: FilterOperator = None):
         predicate = None if origin is None else origin.predicate
diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
index 6d9ffaae..5a67cfaf 100644
--- a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
@@ -1,6 +1,6 @@
 from typing import Set
 from pywy.wayangplan.sink import TextFileSink
-from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
 from pywy.platforms.python.channels import (
                                                 Channel,
                                                 ChannelDescriptor,
@@ -9,7 +9,7 @@ from pywy.platforms.python.channels import (
                                             )
 
 
-class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator):
+class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
 
     def __init__(self, origin: TextFileSink = None):
         path = None if origin is None else origin.path
diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
index 96d9f96d..bac1f844 100644
--- a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
@@ -1,6 +1,6 @@
 from typing import Set
 from pywy.wayangplan.source import TextFileSource
-from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
 from pywy.platforms.python.channels import (
                                                 Channel,
                                                 ChannelDescriptor,
@@ -9,7 +9,7 @@ from pywy.platforms.python.channels import (
                                             )
 
 
-class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator):
+class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
 
     def __init__(self, origin: TextFileSource = None):
         path = None if origin is None else origin.path
diff --git a/python/src/pywy/platforms/python/operators/__init__.py b/python/src/pywy/platforms/python/operators/__init__.py
index 5ddf61ae..2f7f3ca1 100644
--- a/python/src/pywy/platforms/python/operators/__init__.py
+++ b/python/src/pywy/platforms/python/operators/__init__.py
@@ -1,10 +1,10 @@
-from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
 from pywy.platforms.python.operators.PyFilterOperator import PyFilterOperator
 from pywy.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator
 from pywy.platforms.python.operators.PyTextFileSinkOperator import PyTextFileSinkOperator
 
 __ALL__ = [
-    PythonExecutionOperator,
+    PyExecutionOperator,
     PyFilterOperator,
     PyTextFileSourceOperator,
     PyTextFileSinkOperator
diff --git a/python/src/pywy/platforms/python/plugin/plugin.py b/python/src/pywy/platforms/python/plugin/plugin.py
index 010c49cd..34814638 100644
--- a/python/src/pywy/platforms/python/plugin/plugin.py
+++ b/python/src/pywy/platforms/python/plugin/plugin.py
@@ -1,3 +1,5 @@
+from pywy.platforms.basic.executor import Executor
+from pywy.platforms.python.execution.executor import PyExecutor
 from pywy.platforms.python.platform import PythonPlatform
 from pywy.platforms.basic.plugin import Plugin
 from pywy.platforms.python.mappings import PywyOperatorMappings
@@ -6,4 +8,7 @@ from pywy.platforms.python.mappings import PywyOperatorMappings
 class PythonPlugin(Plugin):
 
     def __init__(self):
-        super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings)
\ No newline at end of file
+        super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings)
+
+    def get_executor(self) -> Executor:
+        return PyExecutor()
\ No newline at end of file
diff --git a/python/src/pywy/wayangplan/base.py b/python/src/pywy/wayangplan/base.py
index 1a81052a..ebe2e2ff 100644
--- a/python/src/pywy/wayangplan/base.py
+++ b/python/src/pywy/wayangplan/base.py
@@ -1,14 +1,16 @@
 from typing import ( TypeVar, Optional, List, Set )
-from pywy.platforms.basic.channel import ChannelDescriptor
+from pywy.platforms.basic.channel import ChannelDescriptor, Channel
 
 class PywyOperator:
 
     inputSlot : List[TypeVar]
-    inputChannel : ChannelDescriptor
+    inputChannel : List[Channel]
+    inputChannelDescriptor : List[ChannelDescriptor]
     inputOperator: List['PywyOperator']
     inputs : int
     outputSlot : List[TypeVar]
-    outputChannel: ChannelDescriptor
+    outputChannel: List[Channel]
+    outputChannelDescriptor: List[ChannelDescriptor]
     outputOperator: List['PywyOperator']
     outputs: int
 
@@ -26,6 +28,8 @@ class PywyOperator:
         self.outputs = output_lenght
         self.inputOperator = [None] * self.inputs
         self.outputOperator = [None] * self.outputs
+        self.inputChannel = [None] * self.inputs
+        self.outputChannel = [None] * self.outputs
 
     def validate_inputs(self, vec):
         if len(vec) != self.inputs: