You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/13 17:33:39 UTC
incubator-ariatosca git commit: graph work
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-278-Remove-core-tasks 979a4b445 -> 092b45f0d
graph work
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/092b45f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/092b45f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/092b45f0
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 092b45f0de707cc9c1ed8bebe594bb0b7bb5846f
Parents: 979a4b4
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 13 20:33:34 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 13 20:33:34 2017 +0300
----------------------------------------------------------------------
aria/modeling/orchestration.py | 10 +--
aria/orchestrator/workflows/core/engine.py | 16 ++--
aria/orchestrator/workflows/core/translation.py | 93 +++++++++-----------
.../test_task_graph_into_execution_graph.py | 13 +--
.../workflows/executor/test_process_executor.py | 1 -
5 files changed, 60 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 37aa431..4a5771a 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -400,14 +400,14 @@ class TaskBase(mixins.ModelMixin):
def retry(message=None, retry_interval=None):
raise TaskRetryException(message, retry_interval=retry_interval)
- @declared_attr
- def operation_task_dependency_fk(cls):
- """For Type one-to-many to Type"""
- return relationship.foreign_key('task', nullable=True)
+ # @declared_attr
+ # def dependency_fk(cls):
+ # """For Type one-to-many to Type"""
+ # return relationship.foreign_key('task', nullable=True)
@declared_attr
def dependent_tasks(cls):
- return relationship.one_to_many_self(cls, 'operation_task_dependency_fk')
+ return relationship.many_to_many(cls, 'task', 'dependent', other_property='dependencies')
def has_ended(self):
if self.stub_type is not None:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 826a7a2..67e8c1d 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -42,13 +42,13 @@ class Engine(logger.LoggerMixin):
self._workflow_context = workflow_context
self._execution_graph = networkx.DiGraph()
self._executor_kwargs = executor_kwargs
- translation.build_execution_graph(task_graph=tasks_graph,
- execution_graph=self._execution_graph,
- default_executor=executor,
- execution=workflow_context.execution)
+ translation.store_tasks(task_graph=tasks_graph,
+ execution_graph=self._execution_graph,
+ default_executor=executor,
+ execution=workflow_context.execution)
# Flush changes
- workflow_context.model.execution._session.flush()
+ workflow_context.model.execution.update(workflow_context.execution)
def execute(self):
"""
@@ -106,11 +106,7 @@ class Engine(logger.LoggerMixin):
def _tasks_iter(self):
for _, data in self._execution_graph.nodes_iter(data=True):
- task = data['task']
- if isinstance(task, models.Task):
- if not task.has_ended():
- self._workflow_context.model.task.refresh(task)
- yield task
+ yield self._workflow_context.model.task.get(data['task'].id)
def _handle_executable_task(self, task):
if not task.stub_type:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index be4e34d..8da9bd7 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -22,14 +22,10 @@ from .. import api
from ..executor import base
-def build_execution_graph(
- task_graph,
- execution_graph,
- default_executor,
- execution,
- start_stub_type=models.Task.START_WORKFLOW,
- end_stub_type=models.Task.END_WORKFLOW,
- depends_on=()):
+def store_tasks(ctx, task_graph, default_executor, execution,
+ start_stub_type=models.Task.START_WORKFLOW,
+ end_stub_type=models.Task.END_WORKFLOW,
+ depends_on=()):
"""
Translates the user graph to the execution graph
:param task_graph: The user's graph
@@ -39,27 +35,28 @@ def build_execution_graph(
:param end_stub_type: internal use
:param depends_on: internal use
"""
+ depends_on = list(depends_on)
+
# Insert start marker
start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
_executor=base.StubTaskExecutor,
execution=execution,
- stub_type=start_stub_type)
- _add_task_and_dependencies(execution_graph, start_task, depends_on)
+ stub_type=start_stub_type,
+ dependencies=depends_on)
for api_task in task_graph.topological_order(reverse=True):
dependencies = task_graph.get_dependencies(api_task)
- operation_dependencies = _get_tasks_from_dependencies(
- execution_graph, dependencies, default=[start_task])
+ operation_dependencies = _get_tasks_from_dependencies(ctx, dependencies, [start_task])
if isinstance(api_task, api.task.OperationTask):
- operation_task = models.Task.from_api_task(api_task=api_task,
- executor=default_executor)
- _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
+ models.Task.from_api_task(
+ api_task=api_task, executor=default_executor, dependencies=operation_dependencies)
+
elif isinstance(api_task, api.task.WorkflowTask):
# Build the graph recursively while adding start and end markers
- build_execution_graph(
+ store_tasks(
+ ctx=ctx,
task_graph=api_task,
- execution_graph=execution_graph,
default_executor=default_executor,
execution=execution,
start_stub_type=models.Task.START_SUBWROFKLOW,
@@ -67,33 +64,41 @@ def build_execution_graph(
depends_on=operation_dependencies
)
elif isinstance(api_task, api.task.StubTask):
- stub_task = models.Task(api_id=api_task.id,
- _executor=base.StubTaskExecutor,
- execution=execution,
- stub_type=models.Task.STUB)
- _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
+ models.Task(api_id=api_task.id,
+ _executor=base.StubTaskExecutor,
+ execution=execution,
+ stub_type=models.Task.STUB,
+ dependencies=operation_dependencies)
else:
raise RuntimeError('Undefined state')
# Insert end marker
- workflow_dependencies = _get_tasks_from_dependencies(
- execution_graph,
- _get_non_dependency_tasks(task_graph),
- default=[start_task])
- end_task = models.Task(api_id=_end_graph_suffix(task_graph.id),
- _executor=base.StubTaskExecutor,
- execution=execution,
- stub_type=end_stub_type)
- _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies)
+ workflow_dependencies = [task for task in ctx.model.task.list() if not task.dependent_tasks]
+ models.Task(api_id=_end_graph_suffix(task_graph.id),
+ _executor=base.StubTaskExecutor,
+ execution=execution,
+ stub_type=end_stub_type,
+ dependencies=workflow_dependencies)
+
+ ctx.model.execution.update(execution)
+
+def _start_graph_suffix(api_id):
+ return '{0}-Start'.format(api_id)
+
+
+def _end_graph_suffix(api_id):
+ return '{0}-End'.format(api_id)
-def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()):
- execution_graph.add_node(operation_task.api_id, task=operation_task)
- for dependency in operation_dependencies:
- execution_graph.add_edge(dependency.api_id, operation_task.api_id)
+def construct_graph(graph, execution):
+ for task in execution.tasks:
+ for dependency in task.dependencies:
+ graph.add_edge(dependency, task)
+ return graph
-def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
+
+def _get_tasks_from_dependencies(ctx, dependencies, default=()):
"""
Returns task list from dependencies.
"""
@@ -103,19 +108,5 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
dependency_name = dependency.id
else:
dependency_name = _end_graph_suffix(dependency.id)
- tasks.append(execution_graph.node[dependency_name]['task'])
+ tasks.extend(list(ctx.model.task.list(filters={'name': dependency_name})))
return tasks or default
-
-
-def _start_graph_suffix(api_id):
- return '{0}-Start'.format(api_id)
-
-
-def _end_graph_suffix(api_id):
- return '{0}-End'.format(api_id)
-
-
-def _get_non_dependency_tasks(graph):
- for task in graph.tasks:
- if len(list(graph.get_dependents(task))) == 0:
- yield task
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 398ca7e..4abed37 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -66,11 +66,12 @@ def test_task_graph_into_execution_graph(tmpdir):
test_task_graph.add_dependency(simple_after_task, inner_task_graph)
# Direct check
- execution_graph = DiGraph()
- core.translation.build_execution_graph(task_graph=test_task_graph,
- execution_graph=execution_graph,
- execution=task_context.model.execution.list()[0],
- default_executor=base.StubTaskExecutor)
+ core.translation.store_tasks(ctx=task_context,
+ task_graph=test_task_graph,
+ execution=task_context.model.execution.list()[0],
+ default_executor=base.StubTaskExecutor)
+
+ execution_graph = core.translation.construct_graph(DiGraph(), task_context.execution)
execution_tasks = topological_sort(execution_graph)
assert len(execution_tasks) == 7
@@ -85,7 +86,7 @@ def test_task_graph_into_execution_graph(tmpdir):
'{0}-End'.format(test_task_graph.id)
]
- assert expected_tasks_names == execution_tasks
+ assert expected_tasks_names == [t.api_id for t in execution_tasks]
assert all(isinstance(_get_task_by_name(task_name, execution_graph), models.Task)
for task_name in execution_tasks)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/092b45f0/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 058190e..bca2ea3 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -30,7 +30,6 @@ from tests.fixtures import ( # pylint: disable=unused-import
plugin_manager,
fs_model as model
)
-from . import MockTask
class TestProcessExecutor(object):