You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/13 17:33:39 UTC

incubator-ariatosca git commit: graph work

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 979a4b445 -> 092b45f0d


graph work


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/092b45f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/092b45f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/092b45f0

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 092b45f0de707cc9c1ed8bebe594bb0b7bb5846f
Parents: 979a4b4
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 13 20:33:34 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 13 20:33:34 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                  | 10 +--
 aria/orchestrator/workflows/core/engine.py      | 16 ++--
 aria/orchestrator/workflows/core/translation.py | 93 +++++++++-----------
 .../test_task_graph_into_execution_graph.py     | 13 +--
 .../workflows/executor/test_process_executor.py |  1 -
 5 files changed, 60 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 37aa431..4a5771a 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -400,14 +400,14 @@ class TaskBase(mixins.ModelMixin):
     def retry(message=None, retry_interval=None):
         raise TaskRetryException(message, retry_interval=retry_interval)
 
-    @declared_attr
-    def operation_task_dependency_fk(cls):
-        """For Type one-to-many to Type"""
-        return relationship.foreign_key('task', nullable=True)
+    # @declared_attr
+    # def dependency_fk(cls):
+    #     """For Type one-to-many to Type"""
+    #     return relationship.foreign_key('task', nullable=True)
 
     @declared_attr
     def dependent_tasks(cls):
-        return relationship.one_to_many_self(cls, 'operation_task_dependency_fk')
+        return relationship.many_to_many(cls, 'task', 'dependent', other_property='dependencies')
 
     def has_ended(self):
         if self.stub_type is not None:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 826a7a2..67e8c1d 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -42,13 +42,13 @@ class Engine(logger.LoggerMixin):
         self._workflow_context = workflow_context
         self._execution_graph = networkx.DiGraph()
         self._executor_kwargs = executor_kwargs
-        translation.build_execution_graph(task_graph=tasks_graph,
-                                          execution_graph=self._execution_graph,
-                                          default_executor=executor,
-                                          execution=workflow_context.execution)
+        translation.store_tasks(task_graph=tasks_graph,
+                                execution_graph=self._execution_graph,
+                                default_executor=executor,
+                                execution=workflow_context.execution)
 
         # Flush changes
-        workflow_context.model.execution._session.flush()
+        workflow_context.model.execution.update(workflow_context.execution)
 
     def execute(self):
         """
@@ -106,11 +106,7 @@ class Engine(logger.LoggerMixin):
 
     def _tasks_iter(self):
         for _, data in self._execution_graph.nodes_iter(data=True):
-            task = data['task']
-            if isinstance(task, models.Task):
-                if not task.has_ended():
-                    self._workflow_context.model.task.refresh(task)
-            yield task
+            yield self._workflow_context.model.task.get(data['task'].id)
 
     def _handle_executable_task(self, task):
         if not task.stub_type:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index be4e34d..8da9bd7 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -22,14 +22,10 @@ from .. import api
 from ..executor import base
 
 
-def build_execution_graph(
-        task_graph,
-        execution_graph,
-        default_executor,
-        execution,
-        start_stub_type=models.Task.START_WORKFLOW,
-        end_stub_type=models.Task.END_WORKFLOW,
-        depends_on=()):
+def store_tasks(ctx, task_graph, default_executor, execution,
+                start_stub_type=models.Task.START_WORKFLOW,
+                end_stub_type=models.Task.END_WORKFLOW,
+                depends_on=()):
     """
     Translates the user graph to the execution graph
     :param task_graph: The user's graph
@@ -39,27 +35,28 @@ def build_execution_graph(
     :param end_stub_type: internal use
     :param depends_on: internal use
     """
+    depends_on = list(depends_on)
+
     # Insert start marker
     start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
                              _executor=base.StubTaskExecutor,
                              execution=execution,
-                             stub_type=start_stub_type)
-    _add_task_and_dependencies(execution_graph, start_task, depends_on)
+                             stub_type=start_stub_type,
+                             dependencies=depends_on)
 
     for api_task in task_graph.topological_order(reverse=True):
         dependencies = task_graph.get_dependencies(api_task)
-        operation_dependencies = _get_tasks_from_dependencies(
-            execution_graph, dependencies, default=[start_task])
+        operation_dependencies = _get_tasks_from_dependencies(ctx, dependencies, [start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            operation_task = models.Task.from_api_task(api_task=api_task,
-                                                       executor=default_executor)
-            _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
+             models.Task.from_api_task(
+                 api_task=api_task, executor=default_executor, dependencies=operation_dependencies)
+
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
-            build_execution_graph(
+            store_tasks(
+                ctx=ctx,
                 task_graph=api_task,
-                execution_graph=execution_graph,
                 default_executor=default_executor,
                 execution=execution,
                 start_stub_type=models.Task.START_SUBWROFKLOW,
@@ -67,33 +64,41 @@ def build_execution_graph(
                 depends_on=operation_dependencies
             )
         elif isinstance(api_task, api.task.StubTask):
-            stub_task = models.Task(api_id=api_task.id,
-                                    _executor=base.StubTaskExecutor,
-                                    execution=execution,
-                                    stub_type=models.Task.STUB)
-            _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
+            models.Task(api_id=api_task.id,
+                        _executor=base.StubTaskExecutor,
+                        execution=execution,
+                        stub_type=models.Task.STUB,
+                        dependencies=operation_dependencies)
         else:
             raise RuntimeError('Undefined state')
 
     # Insert end marker
-    workflow_dependencies = _get_tasks_from_dependencies(
-        execution_graph,
-        _get_non_dependency_tasks(task_graph),
-        default=[start_task])
-    end_task = models.Task(api_id=_end_graph_suffix(task_graph.id),
-                           _executor=base.StubTaskExecutor,
-                           execution=execution,
-                           stub_type=end_stub_type)
-    _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies)
+    workflow_dependencies = [task for task in ctx.model.task.list() if not task.dependent_tasks]
+    models.Task(api_id=_end_graph_suffix(task_graph.id),
+                _executor=base.StubTaskExecutor,
+                execution=execution,
+                stub_type=end_stub_type,
+                dependencies=workflow_dependencies)
+
+    ctx.model.execution.update(execution)
+
+def _start_graph_suffix(api_id):
+    return '{0}-Start'.format(api_id)
+
+
+def _end_graph_suffix(api_id):
+    return '{0}-End'.format(api_id)
 
 
-def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()):
-    execution_graph.add_node(operation_task.api_id, task=operation_task)
-    for dependency in operation_dependencies:
-        execution_graph.add_edge(dependency.api_id, operation_task.api_id)
+def construct_graph(graph, execution):
+    for task in execution.tasks:
+        for dependency in task.dependencies:
+            graph.add_edge(dependency, task)
 
+    return graph
 
-def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
+
+def _get_tasks_from_dependencies(ctx, dependencies, default=()):
     """
     Returns task list from dependencies.
     """
@@ -103,19 +108,5 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
             dependency_name = dependency.id
         else:
             dependency_name = _end_graph_suffix(dependency.id)
-        tasks.append(execution_graph.node[dependency_name]['task'])
+        tasks.extend(list(ctx.model.task.list(filters={'name': dependency_name})))
     return tasks or default
-
-
-def _start_graph_suffix(api_id):
-    return '{0}-Start'.format(api_id)
-
-
-def _end_graph_suffix(api_id):
-    return '{0}-End'.format(api_id)
-
-
-def _get_non_dependency_tasks(graph):
-    for task in graph.tasks:
-        if len(list(graph.get_dependents(task))) == 0:
-            yield task

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 398ca7e..4abed37 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -66,11 +66,12 @@ def test_task_graph_into_execution_graph(tmpdir):
     test_task_graph.add_dependency(simple_after_task, inner_task_graph)
 
     # Direct check
-    execution_graph = DiGraph()
-    core.translation.build_execution_graph(task_graph=test_task_graph,
-                                           execution_graph=execution_graph,
-                                           execution=task_context.model.execution.list()[0],
-                                           default_executor=base.StubTaskExecutor)
+    core.translation.store_tasks(ctx=task_context,
+                                 task_graph=test_task_graph,
+                                 execution=task_context.model.execution.list()[0],
+                                 default_executor=base.StubTaskExecutor)
+
+    execution_graph = core.translation.construct_graph(DiGraph(), task_context.execution)
     execution_tasks = topological_sort(execution_graph)
 
     assert len(execution_tasks) == 7
@@ -85,7 +86,7 @@ def test_task_graph_into_execution_graph(tmpdir):
         '{0}-End'.format(test_task_graph.id)
     ]
 
-    assert expected_tasks_names == execution_tasks
+    assert expected_tasks_names == [t.api_id for t in execution_tasks]
     assert all(isinstance(_get_task_by_name(task_name, execution_graph), models.Task)
                for task_name in execution_tasks)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 058190e..bca2ea3 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -30,7 +30,6 @@ from tests.fixtures import (  # pylint: disable=unused-import
     plugin_manager,
     fs_model as model
 )
-from . import MockTask
 
 
 class TestProcessExecutor(object):