You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:30 UTC

[incubator-wayang] 09/32: [WAYANG-#8] Change structure of Graph

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit bff9f575f244cda75bd5b05f38f5c3224ca2ec24
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 16:39:17 2022 +0200

    [WAYANG-#8] Change structure of Graph
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/__init__.py                       |   2 +-
 python/src/pywy/graph/__init__.py                 |  19 ----
 python/src/pywy/graph/graph.py                    | 124 +++++++++++-----------
 python/src/pywy/graph/graphtypes.py               |  26 +++++
 python/src/pywy/{graph => old_graph}/__init__.py  |   0
 python/src/pywy/{graph => old_graph}/graph.py     |   2 +-
 python/src/pywy/{graph => old_graph}/node.py      |   0
 python/src/pywy/{graph => old_graph}/traversal.py |   2 +-
 python/src/pywy/{graph => old_graph}/visitant.py  |   0
 python/src/pywy/orchestrator/dataquanta.py        |   4 +-
 python/src/pywy/wayangplan/wayang.py              |  78 ++------------
 11 files changed, 100 insertions(+), 157 deletions(-)

diff --git a/python/src/pywy/__init__.py b/python/src/pywy/__init__.py
index 38c001b3..39f92eb3 100644
--- a/python/src/pywy/__init__.py
+++ b/python/src/pywy/__init__.py
@@ -18,5 +18,5 @@
 from .config import *
 from .orchestrator import *
 from pywy.translate.protobuf import *
-from .graph import *
+from .old_graph import *
 from .test import *
\ No newline at end of file
diff --git a/python/src/pywy/graph/__init__.py b/python/src/pywy/graph/__init__.py
index 8066b5ee..e69de29b 100644
--- a/python/src/pywy/graph/__init__.py
+++ b/python/src/pywy/graph/__init__.py
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#import graph.graph
-#import graph.node
\ No newline at end of file
diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py
index a66787fa..c15c86e5 100644
--- a/python/src/pywy/graph/graph.py
+++ b/python/src/pywy/graph/graph.py
@@ -1,71 +1,67 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from pywy.graph.node import Node
-import logging
-
-
-# Adjacency Matrix used to analise the plan
-class Graph:
-    def __init__(self):
-        self.graph = {}
-        self.nodes_no = 0
-        self.nodes = []
-
-    # Fills the Graph
-    def populate(self, sinks):
-        for sink in iter(sinks):
-            self.process_operator(sink)
-
-    # Add current operator and set dependencies
-    def process_operator(self, operator):
-        self.add_node(operator.operator_type, operator.id, operator)
-
-        if len(operator.previous) > 0:
-            for parent in operator.previous:
-                if parent:
-                    self.add_node(parent.operator_type, parent.id, parent)
-                    self.add_link(operator.id, parent.id, 1)
-                    self.process_operator(parent)
-
-    def add_node(self, name, id, operator):
-        if id in self.nodes:
+from pywy.types import T
+from typing import Iterable, Dict, Callable, List, Any, Generic
+
+
+class GraphNode(Generic[T]):
+
+    current: T
+    visited: bool
+
+    def __init__(self, op: T):
+        self.current = op
+        self.visited = False
+
+    def getadjacents(self) -> Iterable[T]:
+        pass
+
+    def build_node(self, t:T) -> 'GraphNode[T]':
+        pass
+
+    def adjacents(self, created: Dict[T, 'GraphNode[T]']) -> Iterable['GraphNode[T]']:
+        adjacent = self.getadjacents()
+
+        if len(adjacent) == 0:
+            return []
+
+        def wrap(op:T):
+            if op is None:
+                return None
+            if op not in created:
+                created[op] = self.build_node(op)
+            return created[op]
+
+        return map(wrap, adjacent)
+
+    def visit(self, parent: 'GraphNode[T]', udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], visit_status: bool = True):
+        if(self.visited == visit_status):
             return
+        self.visited = visit_status
+        return udf(self, parent)
 
-        self.nodes_no += 1
-        self.nodes.append(id)
-        new_node = Node(name, id, operator)
 
-        self.graph[id] = new_node
+class WayangGraph(Generic[T]):
 
-    def add_link(self, id_child, id_parent, e):
-        if id_child in self.nodes:
-            if id_parent in self.nodes:
-                self.graph[id_child].add_predecessor(id_parent, e)
-                self.graph[id_parent].add_successor(id_child, e)
+    starting_nodes : List[GraphNode[T]]
+    created_nodes : Dict[T, GraphNode[T]]
 
-    def print_adjlist(self):
+    def __init__(self, nodes: List[T]):
+        self.created_nodes = {}
+        self.starting_nodes = list()
+        for node in nodes:
+            tmp = self.build_node(node)
+            self.starting_nodes.append(tmp)
+            self.created_nodes[node] = tmp
 
-        for key in self.graph:
-            logging.debug("Node: ", self.graph[key].operator_type, " - ", key)
-            for key2 in self.graph[key].predecessors:
-                logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2)
-            for key2 in self.graph[key].successors:
-                logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2)
+    def build_node(self, t:T) -> GraphNode[T]:
+        pass
 
-    def get_node(self, id):
-        return self.graph[id]
+    def traversal(
+            self,
+            origin: GraphNode[T],
+            nodes: Iterable[GraphNode[T]],
+            udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any]
+    ):
+        for node in nodes:
+            adjacents = node.adjacents(self.created_nodes)
+            self.traversal(node, adjacents, udf)
+            node.visit(origin, udf)
\ No newline at end of file
diff --git a/python/src/pywy/graph/graphtypes.py b/python/src/pywy/graph/graphtypes.py
new file mode 100644
index 00000000..d31b4cde
--- /dev/null
+++ b/python/src/pywy/graph/graphtypes.py
@@ -0,0 +1,26 @@
+from typing import Iterable, List
+
+from pywy.graph.graph import GraphNode, WayangGraph
+from pywy.wayangplan.base import WyOperator
+
+class WayangNode(GraphNode[WyOperator]):
+
+    def __init__(self, op: WyOperator):
+        super(WayangNode, self).__init__(op)
+
+    def getadjacents(self) -> Iterable[WyOperator]:
+        operator: WyOperator = self.current
+        if operator is None or operator.inputs == 0:
+            return []
+        return operator.inputOperator
+
+    def build_node(self, t:WyOperator) -> 'WayangNode':
+        return WayangNode(t)
+
+class WayangGraphOfWayangNode(WayangGraph[WayangNode]):
+
+    def __init__(self, nodes: List[WyOperator]):
+        super(WayangGraphOfWayangNode, self).__init__(nodes)
+
+    def build_node(self, t:WyOperator) -> WayangNode:
+        return WayangNode(t)
diff --git a/python/src/pywy/graph/__init__.py b/python/src/pywy/old_graph/__init__.py
similarity index 100%
copy from python/src/pywy/graph/__init__.py
copy to python/src/pywy/old_graph/__init__.py
diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/old_graph/graph.py
similarity index 98%
copy from python/src/pywy/graph/graph.py
copy to python/src/pywy/old_graph/graph.py
index a66787fa..13846c48 100644
--- a/python/src/pywy/graph/graph.py
+++ b/python/src/pywy/old_graph/graph.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from pywy.graph.node import Node
+from pywy.old_graph.node import Node
 import logging
 
 
diff --git a/python/src/pywy/graph/node.py b/python/src/pywy/old_graph/node.py
similarity index 100%
rename from python/src/pywy/graph/node.py
rename to python/src/pywy/old_graph/node.py
diff --git a/python/src/pywy/graph/traversal.py b/python/src/pywy/old_graph/traversal.py
similarity index 97%
rename from python/src/pywy/graph/traversal.py
rename to python/src/pywy/old_graph/traversal.py
index 63542a4e..a2714145 100644
--- a/python/src/pywy/graph/traversal.py
+++ b/python/src/pywy/old_graph/traversal.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from pywy.graph.visitant import Visitant
+from pywy.old_graph.visitant import Visitant
 import logging
 
 
diff --git a/python/src/pywy/graph/visitant.py b/python/src/pywy/old_graph/visitant.py
similarity index 100%
rename from python/src/pywy/graph/visitant.py
rename to python/src/pywy/old_graph/visitant.py
diff --git a/python/src/pywy/orchestrator/dataquanta.py b/python/src/pywy/orchestrator/dataquanta.py
index 4e5a5661..8c4468b2 100644
--- a/python/src/pywy/orchestrator/dataquanta.py
+++ b/python/src/pywy/orchestrator/dataquanta.py
@@ -16,8 +16,8 @@
 #
 
 from pywy.orchestrator.operator import Operator
-from pywy.graph.graph import Graph
-from pywy.graph.traversal import Traversal
+from pywy.old_graph.graph import Graph
+from pywy.old_graph.traversal import Traversal
 from pywy.translate.protobuf.planwriter import MessageWriter
 import itertools
 import collections
diff --git a/python/src/pywy/wayangplan/wayang.py b/python/src/pywy/wayangplan/wayang.py
index 7da13e18..eacad67c 100644
--- a/python/src/pywy/wayangplan/wayang.py
+++ b/python/src/pywy/wayangplan/wayang.py
@@ -1,85 +1,25 @@
-from typing import Iterable, Dict, Callable, NoReturn, List, Set
+from typing import Iterable, Set
 
+from pywy.graph.graph import WayangGraph
+from pywy.graph.graphtypes import WayangGraphOfWayangNode, WayangNode
 from pywy.wayangplan.sink import SinkOperator
-from pywy.wayangplan.base import WyOperator
 from pywy.platforms.basic.plugin import Plugin
 
-class GraphNodeWayang:
-
-    current: WyOperator
-    visited: bool
-
-    def __init__(self, op: WyOperator):
-        self.current = op
-        self.visited = False
-
-    def successors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']:
-        if self.current is None or self.current.outputs == 0:
-            return []
-
-        def wrap(op:WyOperator):
-            if op is None:
-                return None;
-            if op not in created:
-                created[op] = GraphNodeWayang(op)
-            return created[op]
-
-        adjacent = self.current.outputOperator
-        return map(wrap, adjacent)
-
-    def predecessors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']:
-        print("predecessors")
-        print(self)
-        def wrap(op:WyOperator):
-            if op not in created:
-                created[op] = GraphNodeWayang(op)
-            return created[op]
-
-        adjacent = self.current.inputOperator
-        return map(wrap, adjacent)
-
-    def visit(self, parent: 'GraphNodeWayang', udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn], visit_status: bool = True):
-        if(self.visited == visit_status):
-            return
-        udf(self, parent)
-        self.visited = visit_status
-
-class GraphWayang:
-
-    starting_nodes : List[GraphNodeWayang]
-    created_nodes : Dict[WyOperator, GraphNodeWayang]
-
-    def __init__(self, plan:'PywyPlan'):
-        self.created_nodes = {}
-        self.starting_nodes = list()
-        for sink in plan.sinks:
-            tmp = GraphNodeWayang(sink)
-            self.starting_nodes.append(tmp)
-            self.created_nodes[sink] = tmp
-
-
-    def traversal(
-            self,
-            origin: GraphNodeWayang,
-            nodes: Iterable[GraphNodeWayang],
-            udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn]
-    ):
-        for node in nodes:
-            adjacents = node.predecessors(self.created_nodes)
-            self.traversal(node, adjacents, udf)
-            node.visit(origin, udf)
 
 class PywyPlan:
 
-    graph: GraphWayang
+    graph: WayangGraph
 
     def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
         self.plugins = plugins
         self.sinks = sinks
-        self.graph = GraphWayang(self)
+        self.set_graph()
+
+    def set_graph(self):
+        self.graph = WayangGraphOfWayangNode(self.sinks)
 
     def print(self):
-        def print_plan(current: GraphNodeWayang, previous: GraphNodeWayang):
+        def print_plan(current: WayangNode, previous: WayangNode):
             if current is None:
                 print("this is source")
                 print(previous.current)