You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:34 UTC
[incubator-wayang] 13/32: [WAYANG-#8] Translate the operator to execute
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 68069a50b2c7808048bca8b4557cfed7a5b73fe1
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 18:07:45 2022 +0200
[WAYANG-#8] Translate the operator to execute
Signed-off-by: bertty <be...@apache.org>
---
python/src/pywy/dataquanta.py | 15 +++++---
python/src/pywy/graph/graph.py | 7 ++--
python/src/pywy/graph/graphtypes.py | 24 ++++++++-----
python/src/pywy/platforms/basic/mapping.py | 4 +--
python/src/pywy/platforms/basic/plugin.py | 10 +++---
python/src/pywy/platforms/python/plugin/plugin.py | 2 +-
python/src/pywy/test.py | 44 +++++++++++------------
python/src/pywy/translate/translator.py | 44 +++++++++++++++++++++++
python/src/pywy/wayangplan/base.py | 5 +++
python/src/pywy/wayangplan/wayang.py | 6 ++--
10 files changed, 113 insertions(+), 48 deletions(-)
diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index ce9d1a9d..f44540e9 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -1,3 +1,6 @@
+from typing import Set
+
+from pywy.translate.translator import Translator
from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
from pywy.wayangplan import *
from pywy.wayangplan.wayang import PywyPlan
@@ -7,13 +10,15 @@ class WayangContext:
"""
This is the entry point for users to work with Wayang.
"""
+ plugins: Set[Plugin]
+
def __init__(self):
self.plugins = set()
"""
add a :class:`Plugin` to the :class:`Context`
"""
- def register(self, *p: Plugin):
+ def register(self, p: Plugin):
self.plugins.add(p)
return self
@@ -28,7 +33,7 @@ class WayangContext:
return DataQuanta(self, TextFileSource(file_path))
def __str__(self):
- return "Plugins: {} \n".format(str(self.plugins))
+ return "Plugins: {}".format(str(self.plugins))
def __repr__(self):
return self.__str__()
@@ -38,6 +43,7 @@ class DataQuanta(GenericTco):
Represents an intermediate result/data flow edge in a [[WayangPlan]].
"""
previous : WyOperator = None
+ context: WayangContext
def __init__(self, context:WayangContext, operator: WyOperator):
self.operator = operator
@@ -55,8 +61,9 @@ class DataQuanta(GenericTco):
def storeTextFile(self: "DataQuanta[I]", path: str) :
last = self.__connect(TextFileSink(path))
plan = PywyPlan(self.context.plugins, [last])
- #plan.print()
- plan.printTuple()
+
+ trs: Translator = Translator(self.context.plugins.pop(), plan)
+ trs.translate()
# TODO add the logic to execute the plan
def __connect(self, op:WyOperator, port_op: int = 0) -> WyOperator:
diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py
index c15c86e5..6e0a2d08 100644
--- a/python/src/pywy/graph/graph.py
+++ b/python/src/pywy/graph/graph.py
@@ -35,7 +35,7 @@ class GraphNode(Generic[T]):
def visit(self, parent: 'GraphNode[T]', udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], visit_status: bool = True):
if(self.visited == visit_status):
return
- self.visited = visit_status
+ self.visited != visit_status
return udf(self, parent)
@@ -59,9 +59,10 @@ class WayangGraph(Generic[T]):
self,
origin: GraphNode[T],
nodes: Iterable[GraphNode[T]],
- udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any]
+ udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any],
+ visit_status: bool = True
):
for node in nodes:
adjacents = node.adjacents(self.created_nodes)
- self.traversal(node, adjacents, udf)
+ self.traversal(node, adjacents, udf, visit_status)
node.visit(origin, udf)
\ No newline at end of file
diff --git a/python/src/pywy/graph/graphtypes.py b/python/src/pywy/graph/graphtypes.py
index 005ef5f4..cef0ff06 100644
--- a/python/src/pywy/graph/graphtypes.py
+++ b/python/src/pywy/graph/graphtypes.py
@@ -26,24 +26,30 @@ class WGraphOfOperator(WayangGraph[NodeOperator]):
return NodeOperator(t)
-class NodeTuple(GraphNode[Tuple[WyOperator, WyOperator]]):
+class NodeVec(GraphNode[List[WyOperator]]):
def __init__(self, op: WyOperator):
- super(NodeTuple, self).__init__((op, None))
+ super(NodeVec, self).__init__([op, None])
- def getadjacents(self) -> Iterable[Tuple[WyOperator, WyOperator]]:
+ def getadjacents(self) -> Iterable[List[WyOperator]]:
operator: WyOperator = self.current[0]
if operator is None or operator.inputs == 0:
return []
return operator.inputOperator
- def build_node(self, t:WyOperator) -> 'NodeTuple':
- return NodeTuple(t)
+ def build_node(self, t:WyOperator) -> 'NodeVec':
+ return NodeVec(t)
-class WGraphOfTuple(WayangGraph[NodeTuple]):
+ def __str__(self):
+ return "NodeVec {}".format(self.current)
+
+ def __repr__(self):
+ return self.__str__()
+
+class WGraphOfVec(WayangGraph[NodeVec]):
def __init__(self, nodes: List[WyOperator]):
- super(WGraphOfTuple, self).__init__(nodes)
+ super(WGraphOfVec, self).__init__(nodes)
- def build_node(self, t:WyOperator) -> NodeTuple:
- return NodeTuple(t)
\ No newline at end of file
+ def build_node(self, t:WyOperator) -> NodeVec:
+ return NodeVec(t)
\ No newline at end of file
diff --git a/python/src/pywy/platforms/basic/mapping.py b/python/src/pywy/platforms/basic/mapping.py
index 63c7e4ca..e8f14a20 100644
--- a/python/src/pywy/platforms/basic/mapping.py
+++ b/python/src/pywy/platforms/basic/mapping.py
@@ -8,10 +8,10 @@ class Mapping:
self.mappings = {}
def add_mapping(self, operator: WyOperator):
- self.mappings[operator.name] = type(operator)
+ self.mappings[operator.name_basic()] = type(operator)
def get_instanceof(self, operator: WyOperator):
- template = self.mappings[operator.name]
+ template = self.mappings[operator.name_basic()]
if template is None:
raise Exception(
"the operator {} does not have valid mapping".format(
diff --git a/python/src/pywy/platforms/basic/plugin.py b/python/src/pywy/platforms/basic/plugin.py
index 9176e9f3..88da7f73 100644
--- a/python/src/pywy/platforms/basic/plugin.py
+++ b/python/src/pywy/platforms/basic/plugin.py
@@ -1,3 +1,5 @@
+from typing import List, Set
+
from pywy.platforms.basic.platform import Platform
from pywy.platforms.basic.mapping import Mapping
@@ -10,18 +12,18 @@ class Plugin:
In turn, it may require several :clas:`Platform`s for its operation.
"""
- platforms = []
+ platforms: Set[Platform]
mappings: Mapping
- def __init__(self, *platform:Platform, mappings: Mapping = Mapping()):
- self.platforms = list(platform)
+ def __init__(self, platforms:Set[Platform], mappings: Mapping = Mapping()):
+ self.platforms = platforms
self.mappings = mappings
def get_mappings(self) -> Mapping:
return self.mappings
def __str__(self):
- return "Platforms: {}".format(str(self.platforms))
+ return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings))
def __repr__(self):
return self.__str__()
diff --git a/python/src/pywy/platforms/python/plugin/plugin.py b/python/src/pywy/platforms/python/plugin/plugin.py
index 0f870029..0d42db7a 100644
--- a/python/src/pywy/platforms/python/plugin/plugin.py
+++ b/python/src/pywy/platforms/python/plugin/plugin.py
@@ -7,4 +7,4 @@ from pywy.platforms.python.mappings import PywyOperatorMappings
class PythonPlugin(Plugin):
def __init__(self):
- super(PythonPlugin, self).__init__(PythonPlatform(), PywyOperatorMappings)
\ No newline at end of file
+ super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings)
\ No newline at end of file
diff --git a/python/src/pywy/test.py b/python/src/pywy/test.py
index de0153d6..2de508e6 100644
--- a/python/src/pywy/test.py
+++ b/python/src/pywy/test.py
@@ -1,37 +1,37 @@
from pywy.platforms.basic.platform import Platform
from pywy.dataquanta import WayangContext
from pywy.platforms.python.channels import Channel
-from pywy.plugins import java, spark
+from pywy.plugins import java, spark, python
from pywy.wayangplan.unary import *
-p = Platform("nana")
-print("LALA "+str(p))
-pt = type(p)
-print(pt)
-p2 = pt("chao")
-print(p2)
-print(type(p2))
-
-
-print(str(WayangContext().register(java, spark)))
+# p = Platform("nana")
+# print("LALA "+str(p))
+# pt = type(p)
+# print(pt)
+# p2 = pt("chao")
+# print(p2)
+# print(type(p2))
+#
+#
+# print(str(WayangContext().register(java, spark)))
from pywy.types import Predicate, getTypePredicate
-
-predicate : Predicate = lambda x: x % 2 == 0
-getTypePredicate(predicate)
+#
+# predicate : Predicate = lambda x: x % 2 == 0
+# getTypePredicate(predicate)
def pre(a:str):
return len(a) > 3
-
-def func(s:str) -> int:
- return len(s)
-
-def fmfunc(i:int) -> str:
- for x in range(i):
- yield str(x)
+#
+# def func(s:str) -> int:
+# return len(s)
+#
+# def fmfunc(i:int) -> str:
+# for x in range(i):
+# yield str(x)
fileop = WayangContext()\
- .register(java)\
+ .register(python)\
.textFile("/Users/bertty/databloom/blossom/python/resources/test.input")\
.filter(pre)\
.storeTextFile("/Users/bertty/databloom/blossom/python/resources/test.output")
diff --git a/python/src/pywy/translate/translator.py b/python/src/pywy/translate/translator.py
index 8ec29399..bb646c4f 100644
--- a/python/src/pywy/translate/translator.py
+++ b/python/src/pywy/translate/translator.py
@@ -1,12 +1,56 @@
+from pywy.graph.graphtypes import WGraphOfVec, NodeVec
from pywy.platforms.basic.plugin import Plugin
+from pywy.wayangplan import WyOperator
from pywy.wayangplan.wayang import PywyPlan
from pywy.platforms.basic.mapping import Mapping
class Translator:
+ plugin: Plugin
+ plan : PywyPlan
+
def __init__(self, plugin: Plugin, plan: PywyPlan):
self.plugin = plugin
self.plan = plan
def translate(self):
mappings:Mapping = self.plugin.get_mappings()
+ graph = WGraphOfVec(self.plan.sinks)
+ def translate2plugin(current: NodeVec, previous: NodeVec):
+ if current is None:
+ return
+
+ if current.current[1] is None:
+ current.current[1] = mappings.get_instanceof(current.current[0])
+
+ if previous is None:
+ return
+ if previous.current[1] is None:
+ previous.current[1] = mappings.get_instanceof(previous.current[0])
+
+ # TODO not necesary it it 0
+ current.current[1].connect(0, previous.current[1], 0)
+
+ graph.traversal(None, graph.starting_nodes, translate2plugin)
+
+ def print_plan(current: NodeVec, previous: NodeVec):
+ if current is None:
+ print("this is source")
+ print(previous.current)
+ return
+ if previous is None:
+ print("this is sink")
+ print(current.current)
+ return
+
+ print(
+ "############\n{}\n@@@@@ => previous is\n{}\n############\n"
+ .format(
+ current.current,
+ previous.current
+ )
+ )
+
+ graph.traversal(None, graph.starting_nodes, print_plan, False)
+ print("here")
+
diff --git a/python/src/pywy/wayangplan/base.py b/python/src/pywy/wayangplan/base.py
index a87a2f6a..92d4dfca 100644
--- a/python/src/pywy/wayangplan/base.py
+++ b/python/src/pywy/wayangplan/base.py
@@ -63,6 +63,11 @@ class WyOperator:
def postfix(self) -> str:
return ''
+ def name_basic(self, with_prefix: bool = False, with_postfix:bool = True):
+ prefix = len(self.prefix()) if not with_prefix else 0
+ postfix = len(self.postfix()) if not with_postfix else 0
+ return self.name[prefix:len(self.name) - postfix]
+
def __str__(self):
return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format(
str(self.name),
diff --git a/python/src/pywy/wayangplan/wayang.py b/python/src/pywy/wayangplan/wayang.py
index 220f4416..c4d8205d 100644
--- a/python/src/pywy/wayangplan/wayang.py
+++ b/python/src/pywy/wayangplan/wayang.py
@@ -1,7 +1,7 @@
from typing import Iterable, Set
from pywy.graph.graph import WayangGraph
-from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator, WGraphOfTuple, NodeTuple
+from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator, WGraphOfVec, NodeVec
from pywy.wayangplan.sink import SinkOperator
from pywy.platforms.basic.plugin import Plugin
@@ -16,7 +16,7 @@ class PywyPlan:
self.set_graph()
def set_graph(self):
- self.graph = WGraphOfTuple(self.sinks)
+ self.graph = WGraphOfVec(self.sinks)
def print(self):
def print_plan(current: NodeOperator, previous: NodeOperator):
@@ -40,7 +40,7 @@ class PywyPlan:
def printTuple(self):
- def print_plan(current: NodeTuple, previous: NodeTuple):
+ def print_plan(current: NodeVec, previous: NodeVec):
if current is None:
print("this is source")
print(previous.current)