You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:38 UTC
[incubator-wayang] 17/32: [WAYANG-#8] Executor in python platformb
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit f32d04a6e06ac0c9eff5e4a2d94d264700faa3c5
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 21:39:30 2022 +0200
[WAYANG-#8] Executor in python platformb
Signed-off-by: bertty <be...@apache.org>
---
python/src/pywy/dataquanta.py | 6 ++-
python/src/pywy/platforms/basic/executor.py | 9 ++++
python/src/pywy/platforms/basic/plugin.py | 4 ++
python/src/pywy/platforms/basic/translator.py | 56 ++++++++++---------
.../pywy/platforms/python/execution/executor.py | 62 ++++++++++++++++++++++
...ExecutionOperator.py => PyExecutionOperator.py} | 2 +-
.../platforms/python/operators/PyFilterOperator.py | 4 +-
.../python/operators/PyTextFileSinkOperator.py | 4 +-
.../python/operators/PyTextFileSourceOperator.py | 4 +-
.../pywy/platforms/python/operators/__init__.py | 4 +-
python/src/pywy/platforms/python/plugin/plugin.py | 7 ++-
python/src/pywy/wayangplan/base.py | 10 ++--
12 files changed, 131 insertions(+), 41 deletions(-)
diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index d063cbda..14019423 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -62,8 +62,10 @@ class DataQuanta(GenericTco):
last = self.__connect(TextFileSink(path))
plan = PywyPlan(self.context.plugins, [last])
- trs: Translator = Translator(self.context.plugins.pop(), plan)
- trs.translate()
+ plug = self.context.plugins.pop()
+ trs: Translator = Translator(plug, plan)
+ new_plan = trs.translate()
+ plug.get_executor().execute(new_plan)
# TODO add the logic to execute the plan
def __connect(self, op:PywyOperator, port_op: int = 0) -> PywyOperator:
diff --git a/python/src/pywy/platforms/basic/executor.py b/python/src/pywy/platforms/basic/executor.py
new file mode 100644
index 00000000..31fcb2c7
--- /dev/null
+++ b/python/src/pywy/platforms/basic/executor.py
@@ -0,0 +1,9 @@
+
+
+class Executor:
+
+ def __init__(self):
+ pass
+
+ def execute(self, plan):
+ pass
\ No newline at end of file
diff --git a/python/src/pywy/platforms/basic/plugin.py b/python/src/pywy/platforms/basic/plugin.py
index 1f156d9d..838d88a4 100644
--- a/python/src/pywy/platforms/basic/plugin.py
+++ b/python/src/pywy/platforms/basic/plugin.py
@@ -1,5 +1,6 @@
from typing import Set
+from pywy.platforms.basic.executor import Executor
from pywy.platforms.basic.platform import Platform
from pywy.platforms.basic.mapping import Mapping
@@ -22,6 +23,9 @@ class Plugin:
def get_mappings(self) -> Mapping:
return self.mappings
+ def get_executor(self) -> Executor:
+ pass
+
def __str__(self):
return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings))
diff --git a/python/src/pywy/platforms/basic/translator.py b/python/src/pywy/platforms/basic/translator.py
index d7d545e2..6031f7a2 100644
--- a/python/src/pywy/platforms/basic/translator.py
+++ b/python/src/pywy/platforms/basic/translator.py
@@ -15,41 +15,45 @@ class Translator:
def translate(self):
mappings:Mapping = self.plugin.get_mappings()
graph = WGraphOfVec(self.plan.sinks)
- def translate2plugin(current: NodeVec, previous: NodeVec):
+ def translate2plugin(current: NodeVec, next: NodeVec):
if current is None:
return
if current.current[1] is None:
current.current[1] = mappings.get_instanceof(current.current[0])
- if previous is None:
+ if next is None:
return
- if previous.current[1] is None:
- previous.current[1] = mappings.get_instanceof(previous.current[0])
+ if next.current[1] is None:
+ next.current[1] = mappings.get_instanceof(next.current[0])
# TODO not necesary it it 0
- current.current[1].connect(0, previous.current[1], 0)
+ current.current[1].connect(0, next.current[1], 0)
graph.traversal(None, graph.starting_nodes, translate2plugin)
- def print_plan(current: NodeVec, previous: NodeVec):
- if current is None:
- print("this is source")
- print(previous.current)
- return
- if previous is None:
- print("this is sink")
- print(current.current)
- return
-
- print(
- "############\n{}\n@@@@@ => previous is\n{}\n############\n"
- .format(
- current.current,
- previous.current
- )
- )
-
- graph.traversal(None, graph.starting_nodes, print_plan, False)
- print("here")
-
+ # def print_plan(current: NodeVec, previous: NodeVec):
+ # if current is None:
+ # print("this is source")
+ # print(previous.current)
+ # return
+ # if previous is None:
+ # print("this is sink")
+ # print(current.current)
+ # return
+ #
+ # print(
+ # "############\n{}\n@@@@@ => previous is\n{}\n############\n"
+ # .format(
+ # current.current,
+ # previous.current
+ # )
+ # )
+ #
+ # graph.traversal(None, graph.starting_nodes, print_plan, False)
+
+ node = []
+ for elem in graph.starting_nodes:
+ node.append(elem.current[1])
+
+ return PywyPlan(self.plugin, node)
\ No newline at end of file
diff --git a/python/src/pywy/platforms/python/execution/executor.py b/python/src/pywy/platforms/python/execution/executor.py
new file mode 100644
index 00000000..fa17ae8f
--- /dev/null
+++ b/python/src/pywy/platforms/python/execution/executor.py
@@ -0,0 +1,62 @@
+from typing import List
+
+from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator
+from pywy.platforms.basic.channel import Channel
+from pywy.platforms.basic.executor import Executor
+from pywy.platforms.basic.plan import PywyPlan
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
+
+
+class PyExecutor(Executor):
+
+ def __init__(self):
+ super(PyExecutor, self).__init__()
+
+ def execute(self, plan):
+ pywyPlan: PywyPlan = plan
+ graph = WGraphOfOperator(pywyPlan.sinks)
+
+ def exec(current: NodeOperator, next: NodeOperator):
+ if current is None:
+ return
+
+ py_current: PyExecutionOperator = current.current
+ if py_current.outputs == 0:
+ py_current.execute(py_current.inputChannel, [])
+ return
+
+ if next is None:
+ return
+ py_next: PyExecutionOperator = next.current
+ outputs = py_current.get_output_channeldescriptors()
+ inputs = py_next.get_input_channeldescriptors()
+
+ intersect = outputs.intersection(inputs)
+ if len(intersect) == 0:
+ raise Exception(
+ "The operator(A) {} can't connect with (B) {}, because the output of (A) is {} and the input of (B) is {}"
+ .format(
+ py_current,
+ py_next,
+ outputs,
+ inputs
+ )
+ )
+ if len(intersect) > 1:
+ raise Exception(
+ "The interaction between the operator (A) {} and (B) {}, can't be decided because are several channel availables {}"
+ .format(
+ py_current,
+ py_next,
+ intersect
+ )
+ )
+ #TODO validate if is valite for several output
+ py_current.outputChannel: List[Channel] = [intersect.pop().create_instance()]
+
+ py_current.execute(py_current.inputChannel, py_current.outputChannel)
+
+ py_next.inputChannel = py_current.outputChannel
+
+
+ graph.traversal(None, graph.starting_nodes, exec)
\ No newline at end of file
diff --git a/python/src/pywy/platforms/python/operators/PythonExecutionOperator.py b/python/src/pywy/platforms/python/operators/PyExecutionOperator.py
similarity index 78%
rename from python/src/pywy/platforms/python/operators/PythonExecutionOperator.py
rename to python/src/pywy/platforms/python/operators/PyExecutionOperator.py
index 2db44f03..a9d4ebd5 100644
--- a/python/src/pywy/platforms/python/operators/PythonExecutionOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyExecutionOperator.py
@@ -1,7 +1,7 @@
from pywy.wayangplan.base import PywyOperator
from pywy.platforms.python.channels import Channel
-class PythonExecutionOperator(PywyOperator):
+class PyExecutionOperator(PywyOperator):
def prefix(self) -> str:
return 'Py'
diff --git a/python/src/pywy/platforms/python/operators/PyFilterOperator.py b/python/src/pywy/platforms/python/operators/PyFilterOperator.py
index 7c0bbf7e..7d6a503c 100644
--- a/python/src/pywy/platforms/python/operators/PyFilterOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyFilterOperator.py
@@ -1,6 +1,6 @@
from typing import Set
from pywy.wayangplan.unary import FilterOperator
-from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
from pywy.platforms.python.channels import (
Channel,
ChannelDescriptor,
@@ -11,7 +11,7 @@ from pywy.platforms.python.channels import (
)
-class PyFilterOperator(FilterOperator, PythonExecutionOperator):
+class PyFilterOperator(FilterOperator, PyExecutionOperator):
def __init__(self, origin: FilterOperator = None):
predicate = None if origin is None else origin.predicate
diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
index 6d9ffaae..5a67cfaf 100644
--- a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py
@@ -1,6 +1,6 @@
from typing import Set
from pywy.wayangplan.sink import TextFileSink
-from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
from pywy.platforms.python.channels import (
Channel,
ChannelDescriptor,
@@ -9,7 +9,7 @@ from pywy.platforms.python.channels import (
)
-class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator):
+class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
def __init__(self, origin: TextFileSink = None):
path = None if origin is None else origin.path
diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
index 96d9f96d..bac1f844 100644
--- a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
+++ b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py
@@ -1,6 +1,6 @@
from typing import Set
from pywy.wayangplan.source import TextFileSource
-from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
from pywy.platforms.python.channels import (
Channel,
ChannelDescriptor,
@@ -9,7 +9,7 @@ from pywy.platforms.python.channels import (
)
-class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator):
+class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
def __init__(self, origin: TextFileSource = None):
path = None if origin is None else origin.path
diff --git a/python/src/pywy/platforms/python/operators/__init__.py b/python/src/pywy/platforms/python/operators/__init__.py
index 5ddf61ae..2f7f3ca1 100644
--- a/python/src/pywy/platforms/python/operators/__init__.py
+++ b/python/src/pywy/platforms/python/operators/__init__.py
@@ -1,10 +1,10 @@
-from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator
from pywy.platforms.python.operators.PyFilterOperator import PyFilterOperator
from pywy.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator
from pywy.platforms.python.operators.PyTextFileSinkOperator import PyTextFileSinkOperator
__ALL__ = [
- PythonExecutionOperator,
+ PyExecutionOperator,
PyFilterOperator,
PyTextFileSourceOperator,
PyTextFileSinkOperator
diff --git a/python/src/pywy/platforms/python/plugin/plugin.py b/python/src/pywy/platforms/python/plugin/plugin.py
index 010c49cd..34814638 100644
--- a/python/src/pywy/platforms/python/plugin/plugin.py
+++ b/python/src/pywy/platforms/python/plugin/plugin.py
@@ -1,3 +1,5 @@
+from pywy.platforms.basic.executor import Executor
+from pywy.platforms.python.execution.executor import PyExecutor
from pywy.platforms.python.platform import PythonPlatform
from pywy.platforms.basic.plugin import Plugin
from pywy.platforms.python.mappings import PywyOperatorMappings
@@ -6,4 +8,7 @@ from pywy.platforms.python.mappings import PywyOperatorMappings
class PythonPlugin(Plugin):
def __init__(self):
- super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings)
\ No newline at end of file
+ super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings)
+
+ def get_executor(self) -> Executor:
+ return PyExecutor()
\ No newline at end of file
diff --git a/python/src/pywy/wayangplan/base.py b/python/src/pywy/wayangplan/base.py
index 1a81052a..ebe2e2ff 100644
--- a/python/src/pywy/wayangplan/base.py
+++ b/python/src/pywy/wayangplan/base.py
@@ -1,14 +1,16 @@
from typing import ( TypeVar, Optional, List, Set )
-from pywy.platforms.basic.channel import ChannelDescriptor
+from pywy.platforms.basic.channel import ChannelDescriptor, Channel
class PywyOperator:
inputSlot : List[TypeVar]
- inputChannel : ChannelDescriptor
+ inputChannel : List[Channel]
+ inputChannelDescriptor : List[ChannelDescriptor]
inputOperator: List['PywyOperator']
inputs : int
outputSlot : List[TypeVar]
- outputChannel: ChannelDescriptor
+ outputChannel: List[Channel]
+ outputChannelDescriptor: List[ChannelDescriptor]
outputOperator: List['PywyOperator']
outputs: int
@@ -26,6 +28,8 @@ class PywyOperator:
self.outputs = output_lenght
self.inputOperator = [None] * self.inputs
self.outputOperator = [None] * self.outputs
+ self.inputChannel = [None] * self.inputs
+ self.outputChannel = [None] * self.outputs
def validate_inputs(self, vec):
if len(vec) != self.inputs: