You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:34 UTC

[incubator-wayang] 13/32: [WAYANG-#8] Translate the operator to execute

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 68069a50b2c7808048bca8b4557cfed7a5b73fe1
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 18:07:45 2022 +0200

    [WAYANG-#8] Translate the operator to execute
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/dataquanta.py                     | 15 +++++---
 python/src/pywy/graph/graph.py                    |  7 ++--
 python/src/pywy/graph/graphtypes.py               | 24 ++++++++-----
 python/src/pywy/platforms/basic/mapping.py        |  4 +--
 python/src/pywy/platforms/basic/plugin.py         | 10 +++---
 python/src/pywy/platforms/python/plugin/plugin.py |  2 +-
 python/src/pywy/test.py                           | 44 +++++++++++------------
 python/src/pywy/translate/translator.py           | 44 +++++++++++++++++++++++
 python/src/pywy/wayangplan/base.py                |  5 +++
 python/src/pywy/wayangplan/wayang.py              |  6 ++--
 10 files changed, 113 insertions(+), 48 deletions(-)

diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index ce9d1a9d..f44540e9 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -1,3 +1,6 @@
+from typing import Set
+
+from pywy.translate.translator import Translator
 from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
 from pywy.wayangplan import *
 from pywy.wayangplan.wayang import PywyPlan
@@ -7,13 +10,15 @@ class WayangContext:
   """
   This is the entry point for users to work with Wayang.
   """
+  plugins: Set[Plugin]
+
   def __init__(self):
     self.plugins = set()
 
   """
   add a :class:`Plugin` to the :class:`Context`
   """
-  def register(self, *p: Plugin):
+  def register(self, p: Plugin):
     self.plugins.add(p)
     return self
 
@@ -28,7 +33,7 @@ class WayangContext:
     return DataQuanta(self, TextFileSource(file_path))
 
   def __str__(self):
-    return "Plugins: {} \n".format(str(self.plugins))
+    return "Plugins: {}".format(str(self.plugins))
 
   def __repr__(self):
     return self.__str__()
@@ -38,6 +43,7 @@ class DataQuanta(GenericTco):
     Represents an intermediate result/data flow edge in a [[WayangPlan]].
     """
     previous : WyOperator = None
+    context: WayangContext
 
     def __init__(self, context:WayangContext,  operator: WyOperator):
         self.operator = operator
@@ -55,8 +61,9 @@ class DataQuanta(GenericTco):
     def storeTextFile(self: "DataQuanta[I]", path: str) :
         last = self.__connect(TextFileSink(path))
         plan = PywyPlan(self.context.plugins, [last])
-        #plan.print()
-        plan.printTuple()
+
+        trs: Translator =  Translator(self.context.plugins.pop(), plan)
+        trs.translate()
         # TODO add the logic to execute the plan
 
     def __connect(self, op:WyOperator, port_op: int = 0) -> WyOperator:
diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py
index c15c86e5..6e0a2d08 100644
--- a/python/src/pywy/graph/graph.py
+++ b/python/src/pywy/graph/graph.py
@@ -35,7 +35,7 @@ class GraphNode(Generic[T]):
     def visit(self, parent: 'GraphNode[T]', udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], visit_status: bool = True):
         if(self.visited == visit_status):
             return
-        self.visited = visit_status
+        self.visited != visit_status
         return udf(self, parent)
 
 
@@ -59,9 +59,10 @@ class WayangGraph(Generic[T]):
             self,
             origin: GraphNode[T],
             nodes: Iterable[GraphNode[T]],
-            udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any]
+            udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any],
+            visit_status: bool = True
     ):
         for node in nodes:
             adjacents = node.adjacents(self.created_nodes)
-            self.traversal(node, adjacents, udf)
+            self.traversal(node, adjacents, udf, visit_status)
             node.visit(origin, udf)
\ No newline at end of file
diff --git a/python/src/pywy/graph/graphtypes.py b/python/src/pywy/graph/graphtypes.py
index 005ef5f4..cef0ff06 100644
--- a/python/src/pywy/graph/graphtypes.py
+++ b/python/src/pywy/graph/graphtypes.py
@@ -26,24 +26,30 @@ class WGraphOfOperator(WayangGraph[NodeOperator]):
         return NodeOperator(t)
 
 
-class NodeTuple(GraphNode[Tuple[WyOperator, WyOperator]]):
+class NodeVec(GraphNode[List[WyOperator]]):
 
     def __init__(self, op: WyOperator):
-        super(NodeTuple, self).__init__((op, None))
+        super(NodeVec, self).__init__([op, None])
 
-    def getadjacents(self) -> Iterable[Tuple[WyOperator, WyOperator]]:
+    def getadjacents(self) -> Iterable[List[WyOperator]]:
         operator: WyOperator = self.current[0]
         if operator is None or operator.inputs == 0:
             return []
         return operator.inputOperator
 
-    def build_node(self, t:WyOperator) -> 'NodeTuple':
-        return NodeTuple(t)
+    def build_node(self, t:WyOperator) -> 'NodeVec':
+        return NodeVec(t)
 
-class WGraphOfTuple(WayangGraph[NodeTuple]):
+    def __str__(self):
+        return "NodeVec {}".format(self.current)
+
+    def __repr__(self):
+        return self.__str__()
+
+class WGraphOfVec(WayangGraph[NodeVec]):
 
     def __init__(self, nodes: List[WyOperator]):
-        super(WGraphOfTuple, self).__init__(nodes)
+        super(WGraphOfVec, self).__init__(nodes)
 
-    def build_node(self, t:WyOperator) -> NodeTuple:
-        return NodeTuple(t)
\ No newline at end of file
+    def build_node(self, t:WyOperator) -> NodeVec:
+        return NodeVec(t)
\ No newline at end of file
diff --git a/python/src/pywy/platforms/basic/mapping.py b/python/src/pywy/platforms/basic/mapping.py
index 63c7e4ca..e8f14a20 100644
--- a/python/src/pywy/platforms/basic/mapping.py
+++ b/python/src/pywy/platforms/basic/mapping.py
@@ -8,10 +8,10 @@ class Mapping:
         self.mappings = {}
 
     def add_mapping(self, operator: WyOperator):
-        self.mappings[operator.name] = type(operator)
+        self.mappings[operator.name_basic()] = type(operator)
 
     def get_instanceof(self, operator: WyOperator):
-        template = self.mappings[operator.name]
+        template = self.mappings[operator.name_basic()]
         if template is None:
             raise Exception(
                 "the operator {} does not have valid mapping".format(
diff --git a/python/src/pywy/platforms/basic/plugin.py b/python/src/pywy/platforms/basic/plugin.py
index 9176e9f3..88da7f73 100644
--- a/python/src/pywy/platforms/basic/plugin.py
+++ b/python/src/pywy/platforms/basic/plugin.py
@@ -1,3 +1,5 @@
+from typing import List, Set
+
 from pywy.platforms.basic.platform import Platform
 from pywy.platforms.basic.mapping import Mapping
 
@@ -10,18 +12,18 @@ class Plugin:
     In turn, it may require several :clas:`Platform`s for its operation.
     """
 
-    platforms = []
+    platforms: Set[Platform]
     mappings: Mapping
 
-    def __init__(self, *platform:Platform, mappings: Mapping = Mapping()):
-        self.platforms = list(platform)
+    def __init__(self, platforms:Set[Platform], mappings: Mapping = Mapping()):
+        self.platforms = platforms
         self.mappings = mappings
 
     def get_mappings(self) -> Mapping:
         return self.mappings
 
     def __str__(self):
-        return "Platforms: {}".format(str(self.platforms))
+        return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings))
 
     def __repr__(self):
         return self.__str__()
diff --git a/python/src/pywy/platforms/python/plugin/plugin.py b/python/src/pywy/platforms/python/plugin/plugin.py
index 0f870029..0d42db7a 100644
--- a/python/src/pywy/platforms/python/plugin/plugin.py
+++ b/python/src/pywy/platforms/python/plugin/plugin.py
@@ -7,4 +7,4 @@ from pywy.platforms.python.mappings import PywyOperatorMappings
 class PythonPlugin(Plugin):
 
     def __init__(self):
-        super(PythonPlugin, self).__init__(PythonPlatform(), PywyOperatorMappings)
\ No newline at end of file
+        super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings)
\ No newline at end of file
diff --git a/python/src/pywy/test.py b/python/src/pywy/test.py
index de0153d6..2de508e6 100644
--- a/python/src/pywy/test.py
+++ b/python/src/pywy/test.py
@@ -1,37 +1,37 @@
 from pywy.platforms.basic.platform import Platform
 from pywy.dataquanta import WayangContext
 from pywy.platforms.python.channels import Channel
-from pywy.plugins import java, spark
+from pywy.plugins import java, spark, python
 from pywy.wayangplan.unary import *
 
-p = Platform("nana")
-print("LALA "+str(p))
-pt = type(p)
-print(pt)
-p2 = pt("chao")
-print(p2)
-print(type(p2))
-
-
-print(str(WayangContext().register(java, spark)))
+# p = Platform("nana")
+# print("LALA "+str(p))
+# pt = type(p)
+# print(pt)
+# p2 = pt("chao")
+# print(p2)
+# print(type(p2))
+#
+#
+# print(str(WayangContext().register(java, spark)))
 
 from pywy.types import Predicate, getTypePredicate
-
-predicate : Predicate = lambda x: x % 2 == 0
-getTypePredicate(predicate)
+#
+# predicate : Predicate = lambda x: x % 2 == 0
+# getTypePredicate(predicate)
 
 def pre(a:str):
     return len(a) > 3
-
-def func(s:str) -> int:
-    return len(s)
-
-def fmfunc(i:int) -> str:
-    for x in range(i):
-        yield str(x)
+#
+# def func(s:str) -> int:
+#     return len(s)
+#
+# def fmfunc(i:int) -> str:
+#     for x in range(i):
+#         yield str(x)
 
 fileop = WayangContext()\
-            .register(java)\
+            .register(python)\
             .textFile("/Users/bertty/databloom/blossom/python/resources/test.input")\
             .filter(pre)\
             .storeTextFile("/Users/bertty/databloom/blossom/python/resources/test.output")
diff --git a/python/src/pywy/translate/translator.py b/python/src/pywy/translate/translator.py
index 8ec29399..bb646c4f 100644
--- a/python/src/pywy/translate/translator.py
+++ b/python/src/pywy/translate/translator.py
@@ -1,12 +1,56 @@
+from pywy.graph.graphtypes import WGraphOfVec, NodeVec
 from pywy.platforms.basic.plugin import Plugin
+from pywy.wayangplan import WyOperator
 from pywy.wayangplan.wayang import PywyPlan
 from pywy.platforms.basic.mapping import Mapping
 
 class Translator:
 
+    plugin: Plugin
+    plan : PywyPlan
+
     def __init__(self, plugin: Plugin, plan: PywyPlan):
         self.plugin = plugin
         self.plan = plan
 
     def translate(self):
         mappings:Mapping = self.plugin.get_mappings()
+        graph = WGraphOfVec(self.plan.sinks)
+        def translate2plugin(current: NodeVec, previous: NodeVec):
+            if current is None:
+                return
+
+            if current.current[1] is None:
+                current.current[1] = mappings.get_instanceof(current.current[0])
+
+            if previous is None:
+                return
+            if previous.current[1] is None:
+                previous.current[1] = mappings.get_instanceof(previous.current[0])
+
+            # TODO not necesary it it 0
+            current.current[1].connect(0, previous.current[1], 0)
+
+        graph.traversal(None, graph.starting_nodes, translate2plugin)
+
+        def print_plan(current: NodeVec, previous: NodeVec):
+            if current is None:
+                print("this is source")
+                print(previous.current)
+                return
+            if previous is None:
+                print("this is sink")
+                print(current.current)
+                return
+
+            print(
+                "############\n{}\n@@@@@ => previous is\n{}\n############\n"
+                    .format(
+                        current.current,
+                        previous.current
+                     )
+            )
+
+        graph.traversal(None, graph.starting_nodes, print_plan, False)
+        print("here")
+
diff --git a/python/src/pywy/wayangplan/base.py b/python/src/pywy/wayangplan/base.py
index a87a2f6a..92d4dfca 100644
--- a/python/src/pywy/wayangplan/base.py
+++ b/python/src/pywy/wayangplan/base.py
@@ -63,6 +63,11 @@ class WyOperator:
     def postfix(self) -> str:
         return ''
 
+    def name_basic(self, with_prefix: bool = False, with_postfix:bool = True):
+        prefix = len(self.prefix()) if not with_prefix else 0
+        postfix = len(self.postfix()) if not with_postfix else 0
+        return self.name[prefix:len(self.name) - postfix]
+
     def __str__(self):
         return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format(
             str(self.name),
diff --git a/python/src/pywy/wayangplan/wayang.py b/python/src/pywy/wayangplan/wayang.py
index 220f4416..c4d8205d 100644
--- a/python/src/pywy/wayangplan/wayang.py
+++ b/python/src/pywy/wayangplan/wayang.py
@@ -1,7 +1,7 @@
 from typing import Iterable, Set
 
 from pywy.graph.graph import WayangGraph
-from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator, WGraphOfTuple, NodeTuple
+from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator, WGraphOfVec, NodeVec
 from pywy.wayangplan.sink import SinkOperator
 from pywy.platforms.basic.plugin import Plugin
 
@@ -16,7 +16,7 @@ class PywyPlan:
         self.set_graph()
 
     def set_graph(self):
-        self.graph = WGraphOfTuple(self.sinks)
+        self.graph = WGraphOfVec(self.sinks)
 
     def print(self):
         def print_plan(current: NodeOperator, previous: NodeOperator):
@@ -40,7 +40,7 @@ class PywyPlan:
 
 
     def printTuple(self):
-        def print_plan(current: NodeTuple, previous: NodeTuple):
+        def print_plan(current: NodeVec, previous: NodeVec):
             if current is None:
                 print("this is source")
                 print(previous.current)