You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:30 UTC
[incubator-wayang] 09/32: [WAYANG-#8] Change structure of Graph
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit bff9f575f244cda75bd5b05f38f5c3224ca2ec24
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 16:39:17 2022 +0200
[WAYANG-#8] Change structure of Graph
Signed-off-by: bertty <be...@apache.org>
---
python/src/pywy/__init__.py | 2 +-
python/src/pywy/graph/__init__.py | 19 ----
python/src/pywy/graph/graph.py | 124 +++++++++++-----------
python/src/pywy/graph/graphtypes.py | 26 +++++
python/src/pywy/{graph => old_graph}/__init__.py | 0
python/src/pywy/{graph => old_graph}/graph.py | 2 +-
python/src/pywy/{graph => old_graph}/node.py | 0
python/src/pywy/{graph => old_graph}/traversal.py | 2 +-
python/src/pywy/{graph => old_graph}/visitant.py | 0
python/src/pywy/orchestrator/dataquanta.py | 4 +-
python/src/pywy/wayangplan/wayang.py | 78 ++------------
11 files changed, 100 insertions(+), 157 deletions(-)
diff --git a/python/src/pywy/__init__.py b/python/src/pywy/__init__.py
index 38c001b3..39f92eb3 100644
--- a/python/src/pywy/__init__.py
+++ b/python/src/pywy/__init__.py
@@ -18,5 +18,5 @@
from .config import *
from .orchestrator import *
from pywy.translate.protobuf import *
-from .graph import *
+from .old_graph import *
from .test import *
\ No newline at end of file
diff --git a/python/src/pywy/graph/__init__.py b/python/src/pywy/graph/__init__.py
index 8066b5ee..e69de29b 100644
--- a/python/src/pywy/graph/__init__.py
+++ b/python/src/pywy/graph/__init__.py
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#import graph.graph
-#import graph.node
\ No newline at end of file
diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py
index a66787fa..c15c86e5 100644
--- a/python/src/pywy/graph/graph.py
+++ b/python/src/pywy/graph/graph.py
@@ -1,71 +1,67 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from pywy.graph.node import Node
-import logging
-
-
-# Adjacency Matrix used to analise the plan
-class Graph:
- def __init__(self):
- self.graph = {}
- self.nodes_no = 0
- self.nodes = []
-
- # Fills the Graph
- def populate(self, sinks):
- for sink in iter(sinks):
- self.process_operator(sink)
-
- # Add current operator and set dependencies
- def process_operator(self, operator):
- self.add_node(operator.operator_type, operator.id, operator)
-
- if len(operator.previous) > 0:
- for parent in operator.previous:
- if parent:
- self.add_node(parent.operator_type, parent.id, parent)
- self.add_link(operator.id, parent.id, 1)
- self.process_operator(parent)
-
- def add_node(self, name, id, operator):
- if id in self.nodes:
+from pywy.types import T
+from typing import Iterable, Dict, Callable, List, Any, Generic
+
+
+class GraphNode(Generic[T]):
+
+ current: T
+ visited: bool
+
+ def __init__(self, op: T):
+ self.current = op
+ self.visited = False
+
+ def getadjacents(self) -> Iterable[T]:
+ pass
+
+ def build_node(self, t:T) -> 'GraphNode[T]':
+ pass
+
+ def adjacents(self, created: Dict[T, 'GraphNode[T]']) -> Iterable['GraphNode[T]']:
+ adjacent = self.getadjacents()
+
+ if len(adjacent) == 0:
+ return []
+
+ def wrap(op:T):
+ if op is None:
+ return None
+ if op not in created:
+ created[op] = self.build_node(op)
+ return created[op]
+
+ return map(wrap, adjacent)
+
+ def visit(self, parent: 'GraphNode[T]', udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], visit_status: bool = True):
+ if(self.visited == visit_status):
return
+ self.visited = visit_status
+ return udf(self, parent)
- self.nodes_no += 1
- self.nodes.append(id)
- new_node = Node(name, id, operator)
- self.graph[id] = new_node
+class WayangGraph(Generic[T]):
- def add_link(self, id_child, id_parent, e):
- if id_child in self.nodes:
- if id_parent in self.nodes:
- self.graph[id_child].add_predecessor(id_parent, e)
- self.graph[id_parent].add_successor(id_child, e)
+ starting_nodes : List[GraphNode[T]]
+ created_nodes : Dict[T, GraphNode[T]]
- def print_adjlist(self):
+ def __init__(self, nodes: List[T]):
+ self.created_nodes = {}
+ self.starting_nodes = list()
+ for node in nodes:
+ tmp = self.build_node(node)
+ self.starting_nodes.append(tmp)
+ self.created_nodes[node] = tmp
- for key in self.graph:
- logging.debug("Node: ", self.graph[key].operator_type, " - ", key)
- for key2 in self.graph[key].predecessors:
- logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2)
- for key2 in self.graph[key].successors:
- logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2)
+ def build_node(self, t:T) -> GraphNode[T]:
+ pass
- def get_node(self, id):
- return self.graph[id]
+ def traversal(
+ self,
+ origin: GraphNode[T],
+ nodes: Iterable[GraphNode[T]],
+ udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any]
+ ):
+ for node in nodes:
+ adjacents = node.adjacents(self.created_nodes)
+ self.traversal(node, adjacents, udf)
+ node.visit(origin, udf)
\ No newline at end of file
diff --git a/python/src/pywy/graph/graphtypes.py b/python/src/pywy/graph/graphtypes.py
new file mode 100644
index 00000000..d31b4cde
--- /dev/null
+++ b/python/src/pywy/graph/graphtypes.py
@@ -0,0 +1,26 @@
+from typing import Iterable, List
+
+from pywy.graph.graph import GraphNode, WayangGraph
+from pywy.wayangplan.base import WyOperator
+
+class WayangNode(GraphNode[WyOperator]):
+
+ def __init__(self, op: WyOperator):
+ super(WayangNode, self).__init__(op)
+
+ def getadjacents(self) -> Iterable[WyOperator]:
+ operator: WyOperator = self.current
+ if operator is None or operator.inputs == 0:
+ return []
+ return operator.inputOperator
+
+ def build_node(self, t:WyOperator) -> 'WayangNode':
+ return WayangNode(t)
+
+class WayangGraphOfWayangNode(WayangGraph[WayangNode]):
+
+ def __init__(self, nodes: List[WyOperator]):
+ super(WayangGraphOfWayangNode, self).__init__(nodes)
+
+ def build_node(self, t:WyOperator) -> WayangNode:
+ return WayangNode(t)
diff --git a/python/src/pywy/graph/__init__.py b/python/src/pywy/old_graph/__init__.py
similarity index 100%
copy from python/src/pywy/graph/__init__.py
copy to python/src/pywy/old_graph/__init__.py
diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/old_graph/graph.py
similarity index 98%
copy from python/src/pywy/graph/graph.py
copy to python/src/pywy/old_graph/graph.py
index a66787fa..13846c48 100644
--- a/python/src/pywy/graph/graph.py
+++ b/python/src/pywy/old_graph/graph.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from pywy.graph.node import Node
+from pywy.old_graph.node import Node
import logging
diff --git a/python/src/pywy/graph/node.py b/python/src/pywy/old_graph/node.py
similarity index 100%
rename from python/src/pywy/graph/node.py
rename to python/src/pywy/old_graph/node.py
diff --git a/python/src/pywy/graph/traversal.py b/python/src/pywy/old_graph/traversal.py
similarity index 97%
rename from python/src/pywy/graph/traversal.py
rename to python/src/pywy/old_graph/traversal.py
index 63542a4e..a2714145 100644
--- a/python/src/pywy/graph/traversal.py
+++ b/python/src/pywy/old_graph/traversal.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from pywy.graph.visitant import Visitant
+from pywy.old_graph.visitant import Visitant
import logging
diff --git a/python/src/pywy/graph/visitant.py b/python/src/pywy/old_graph/visitant.py
similarity index 100%
rename from python/src/pywy/graph/visitant.py
rename to python/src/pywy/old_graph/visitant.py
diff --git a/python/src/pywy/orchestrator/dataquanta.py b/python/src/pywy/orchestrator/dataquanta.py
index 4e5a5661..8c4468b2 100644
--- a/python/src/pywy/orchestrator/dataquanta.py
+++ b/python/src/pywy/orchestrator/dataquanta.py
@@ -16,8 +16,8 @@
#
from pywy.orchestrator.operator import Operator
-from pywy.graph.graph import Graph
-from pywy.graph.traversal import Traversal
+from pywy.old_graph.graph import Graph
+from pywy.old_graph.traversal import Traversal
from pywy.translate.protobuf.planwriter import MessageWriter
import itertools
import collections
diff --git a/python/src/pywy/wayangplan/wayang.py b/python/src/pywy/wayangplan/wayang.py
index 7da13e18..eacad67c 100644
--- a/python/src/pywy/wayangplan/wayang.py
+++ b/python/src/pywy/wayangplan/wayang.py
@@ -1,85 +1,25 @@
-from typing import Iterable, Dict, Callable, NoReturn, List, Set
+from typing import Iterable, Set
+from pywy.graph.graph import WayangGraph
+from pywy.graph.graphtypes import WayangGraphOfWayangNode, WayangNode
from pywy.wayangplan.sink import SinkOperator
-from pywy.wayangplan.base import WyOperator
from pywy.platforms.basic.plugin import Plugin
-class GraphNodeWayang:
-
- current: WyOperator
- visited: bool
-
- def __init__(self, op: WyOperator):
- self.current = op
- self.visited = False
-
- def successors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']:
- if self.current is None or self.current.outputs == 0:
- return []
-
- def wrap(op:WyOperator):
- if op is None:
- return None;
- if op not in created:
- created[op] = GraphNodeWayang(op)
- return created[op]
-
- adjacent = self.current.outputOperator
- return map(wrap, adjacent)
-
- def predecessors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']:
- print("predecessors")
- print(self)
- def wrap(op:WyOperator):
- if op not in created:
- created[op] = GraphNodeWayang(op)
- return created[op]
-
- adjacent = self.current.inputOperator
- return map(wrap, adjacent)
-
- def visit(self, parent: 'GraphNodeWayang', udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn], visit_status: bool = True):
- if(self.visited == visit_status):
- return
- udf(self, parent)
- self.visited = visit_status
-
-class GraphWayang:
-
- starting_nodes : List[GraphNodeWayang]
- created_nodes : Dict[WyOperator, GraphNodeWayang]
-
- def __init__(self, plan:'PywyPlan'):
- self.created_nodes = {}
- self.starting_nodes = list()
- for sink in plan.sinks:
- tmp = GraphNodeWayang(sink)
- self.starting_nodes.append(tmp)
- self.created_nodes[sink] = tmp
-
-
- def traversal(
- self,
- origin: GraphNodeWayang,
- nodes: Iterable[GraphNodeWayang],
- udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn]
- ):
- for node in nodes:
- adjacents = node.predecessors(self.created_nodes)
- self.traversal(node, adjacents, udf)
- node.visit(origin, udf)
class PywyPlan:
- graph: GraphWayang
+ graph: WayangGraph
def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
self.plugins = plugins
self.sinks = sinks
- self.graph = GraphWayang(self)
+ self.set_graph()
+
+ def set_graph(self):
+ self.graph = WayangGraphOfWayangNode(self.sinks)
def print(self):
- def print_plan(current: GraphNodeWayang, previous: GraphNodeWayang):
+ def print_plan(current: WayangNode, previous: WayangNode):
if current is None:
print("this is source")
print(previous.current)